diff --git a/handler/src/main/java/io/netty/handler/queue/BlockingReadHandler.java b/handler/src/main/java/io/netty/handler/queue/BlockingReadHandler.java deleted file mode 100644 index ad4f854abf..0000000000 --- a/handler/src/main/java/io/netty/handler/queue/BlockingReadHandler.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.queue; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.MessageBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.BlockingOperationException; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundMessageHandlerAdapter; -import io.netty.channel.ChannelPipeline; -import io.netty.util.internal.QueueFactory; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * Emulates blocking read operation. This handler stores all received messages - * into a {@link BlockingQueue} and returns the received messages when - * {@link #read()}, {@link #read(long, TimeUnit)}, {@link #readEvent()}, or - * {@link #readEvent(long, TimeUnit)} method is called. - *

- * Please note that this handler is only useful for the cases where there are - * very small number of connections, such as testing and simple client-side - * application development. - *

- * Also, any handler placed after this handler will never receive - * {@code messageReceived}, {@code exceptionCaught}, and {@code channelClosed} - * events, hence it should be placed in the last place in a pipeline. - *

- * Here is an example that demonstrates the usage: - *

- * {@link BlockingReadHandler}<{@link ByteBuf}> reader =
- *         new {@link BlockingReadHandler}<{@link ByteBuf}>();
- * {@link ChannelPipeline} p = ...;
- * p.addLast("reader", reader);
- *
- * ...
- *
- * // Read a message from a channel in a blocking manner.
- * try {
- *     {@link ByteBuf} buf = reader.read(60, TimeUnit.SECONDS);
- *     if (buf == null) {
- *         // Connection closed.
- *     } else {
- *         // Handle the received message here.
- *     }
- * } catch ({@link BlockingReadTimeoutException} e) {
- *     // Read timed out.
- * } catch (IOException e) {
- *     // Other read errors
- * }
- * 
- * - * @param the type of the received messages - */ -public class BlockingReadHandler extends ChannelInboundMessageHandlerAdapter { - - private static final Object INACTIVE = new Object(); - - private volatile ChannelHandlerContext ctx; - private final BlockingQueue queue; - private volatile boolean closed; - - /** - * Creates a new instance with the default unbounded {@link BlockingQueue} - * implementation. - */ - public BlockingReadHandler() { - this(QueueFactory.createQueue()); - } - - /** - * Creates a new instance with the specified {@link BlockingQueue}. - */ - public BlockingReadHandler(BlockingQueue queue) { - if (queue == null) { - throw new NullPointerException("queue"); - } - this.queue = queue; - } - - @Override - public MessageBuf newInboundBuffer( - ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; - return Unpooled.wrappedBuffer(queue); - } - - /** - * Returns {@code true} if and only if the {@link Channel} associated with - * this handler has been closed. - */ - public boolean isClosed() { - return closed; - } - - /** - * Waits until a new message is received or the associated {@link Channel} - * is closed. - * - * @return the received message or {@code null} if the associated - * {@link Channel} has been closed - * @throws IOException - * if failed to receive a new message - * @throws InterruptedException - * if the operation has been interrupted - */ - public E read() throws IOException, InterruptedException { - return filter(readEvent()); - } - - /** - * Waits until a new message is received or the associated {@link Channel} - * is closed. - * - * @param timeout - * the amount time to wait until a new message is received. - * If no message is received within the timeout, - * {@link BlockingReadTimeoutException} is thrown. - * @param unit - * the unit of {@code timeout} - * - * @return the received message or {@code null} if the associated - * {@link Channel} has been closed - * @throws BlockingReadTimeoutException - * if no message was received within the specified timeout - * @throws IOException - * if failed to receive a new message - * @throws InterruptedException - * if the operation has been interrupted - */ - public E read(long timeout, TimeUnit unit) throws IOException, InterruptedException { - return filter(readEvent(timeout, unit)); - } - - @SuppressWarnings("unchecked") - private E filter(Object e) throws IOException { - if (e == null || e == INACTIVE) { - return null; - } - - if (e instanceof IOException) { - throw (IOException) e; - } - if (e instanceof Throwable) { - throw (IOException) new IOException().initCause((Throwable) e); - } - - return (E) e; - } - - private Object readEvent() throws InterruptedException { - detectDeadLock(); - if (isClosed()) { - return null; - } - - return queue.take(); - } - - private Object readEvent(long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException { - detectDeadLock(); - if (isClosed()) { - return null; - } - - Object o = queue.poll(timeout, unit); - if (o == null) { - throw new BlockingReadTimeoutException(); - } else { - return o; - } - } - - private void detectDeadLock() { - if (ctx.executor().inEventLoop()) { - throw new BlockingOperationException(); - } - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - addEvent(INACTIVE); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, - Throwable cause) throws Exception { - addEvent(cause); - } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - addEvent(INACTIVE); - } - - private void addEvent(Object e) { - if (!closed) { - if (e == INACTIVE) { - closed = true; - } - queue.add(e); - } - } - - @Override - public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { - // TODO Auto-generated method stub - - } -} diff --git a/handler/src/main/java/io/netty/handler/queue/BlockingReadTimeoutException.java b/handler/src/main/java/io/netty/handler/queue/BlockingReadTimeoutException.java deleted file mode 100644 index 6c1cc7ffd8..0000000000 --- a/handler/src/main/java/io/netty/handler/queue/BlockingReadTimeoutException.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.queue; - -import java.io.IOException; -import java.io.InterruptedIOException; - -/** - * A {@link IOException} raised by {@link BlockingReadHandler} when no data - * was read within a certain period of time. - */ -public class BlockingReadTimeoutException extends InterruptedIOException { - - private static final long serialVersionUID = 356009226872649493L; - - /** - * Creates a new instance. - */ - public BlockingReadTimeoutException() { - } - - /** - * Creates a new instance. - */ - public BlockingReadTimeoutException(String message, Throwable cause) { - super(message); - initCause(cause); - } - - /** - * Creates a new instance. - */ - public BlockingReadTimeoutException(String message) { - super(message); - } - - /** - * Creates a new instance. - */ - public BlockingReadTimeoutException(Throwable cause) { - initCause(cause); - } -} diff --git a/handler/src/main/java/io/netty/handler/queue/package-info.java b/handler/src/main/java/io/netty/handler/queue/package-info.java deleted file mode 100644 index 35b94f9f44..0000000000 --- a/handler/src/main/java/io/netty/handler/queue/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -/** - * The special-purpose handlers that store an event into an internal queue - * instead of propagating the event immediately. - * - * @apiviz.exclude \.channel\. - * @apiviz.exclude Exception$ - */ -package io.netty.handler.queue;