From aa5510e3b289a8044593bfd173a0f762bf1842ef Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 29 Oct 2011 20:27:58 +0200 Subject: [PATCH] Remove in favor of IOStreamChannelFactory --- .../BlockingChannelBufferInputStream.java | 170 ---------------- .../handler/stream/ChannelOutputStream.java | 80 -------- .../netty/handler/stream/StreamHandler.java | 181 ------------------ 3 files changed, 431 deletions(-) delete mode 100644 src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java delete mode 100644 src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java delete mode 100644 src/main/java/org/jboss/netty/handler/stream/StreamHandler.java diff --git a/src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java b/src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java deleted file mode 100644 index 146aacc69f..0000000000 --- a/src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java +++ /dev/null @@ -1,170 +0,0 @@ -package org.jboss.netty.handler.stream; - -import java.io.IOException; -import java.io.InputStream; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; - -/** - * {@link InputStream} implementation which can be used to write {@link ChannelBuffer} - * objects to and read them - * - * - * @author The Netty Project - * @author Norman Maurer - */ -public class BlockingChannelBufferInputStream extends InputStream{ - private final Object mutex = new Object(); - - private final ChannelBuffer buf; - - private volatile boolean closed; - - private volatile boolean released; - - private IOException exception; - - public BlockingChannelBufferInputStream() { - buf = ChannelBuffers.dynamicBuffer(); - } - - @Override - public int available() { - if (released) { - return 0; - } - - synchronized (mutex) { - return buf.readableBytes(); - } - } - - @Override - public void close() { - if (closed) { - return; - } - - synchronized (mutex) { - closed = true; - releaseBuffer(); - - mutex.notifyAll(); - } - } - - @Override - public int read() throws IOException { - synchronized (mutex) { - if (!waitForData()) { - return -1; - } - - return buf.readByte() & 0xff; - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - synchronized (mutex) { - if (!waitForData()) { - return -1; - } - - int readBytes; - - if (len > buf.readableBytes()) { - readBytes = buf.readableBytes(); - } else { - readBytes = len; - } - - buf.readBytes(b, off, readBytes); - - return readBytes; - } - } - - private boolean waitForData() throws IOException { - if (released) { - return false; - } - - synchronized (mutex) { - while (!released && buf.readableBytes() == 0 && exception == null) { - try { - mutex.wait(); - } catch (InterruptedException e) { - IOException ioe = new IOException( - "Interrupted while waiting for more data"); - ioe.initCause(e); - throw ioe; - } - } - } - - if (exception != null) { - releaseBuffer(); - throw exception; - } - - if (closed && buf.readableBytes() == 0) { - releaseBuffer(); - - return false; - } - - return true; - } - - private void releaseBuffer() { - if (released) { - return; - } - - released = true; - } - - /** - * Write the {@link ChannelBuffer} to {@link InputStream} and unblock the - * read methods - * - * @param src buffer - */ - public void write(ChannelBuffer src) { - synchronized (mutex) { - if (closed) { - return; - } - - if (buf.readable()) { - - this.buf.writeBytes(src); - //this.buf.readerIndex(0); - } else { - this.buf.clear(); - this.buf.writeBytes(src); - this.buf.readerIndex(0); - mutex.notifyAll(); - } - } - } - - /** - * Throw the given {@link IOException} on the next read call - * - * @param e - */ - public void throwException(IOException e) { - synchronized (mutex) { - if (exception == null) { - exception = e; - - mutex.notifyAll(); - } - } - } - - -} diff --git a/src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java b/src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java deleted file mode 100644 index 2810ff93cc..0000000000 --- a/src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java +++ /dev/null @@ -1,80 +0,0 @@ -package org.jboss.netty.handler.stream; - -import java.io.IOException; -import java.io.OutputStream; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; - -/** - * {@link OutputStream} which write data to the wrapped {@link Channel} - * - * @author The Netty Project - * @author Norman Maurer - */ -public class ChannelOutputStream extends OutputStream{ - - private final Channel channel; - - private ChannelFuture lastChannelFuture; - - public ChannelOutputStream(Channel channel) { - this.channel = channel; - } - - @Override - public void close() throws IOException { - try { - flush(); - } finally { - channel.close().awaitUninterruptibly(); - } - } - - private void checkClosed() throws IOException { - if (!channel.isConnected()) { - throw new IOException("The session has been closed."); - } - } - - private synchronized void write(ChannelBuffer buf) throws IOException { - checkClosed(); - ChannelFuture future = channel.write(buf); - lastChannelFuture = future; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - write(ChannelBuffers.copiedBuffer(b.clone(), off, len)); - } - - @Override - public void write(int b) throws IOException { - ChannelBuffer buf = ChannelBuffers.buffer(1); - buf.writeByte((byte) b); - write(buf); - } - - @Override - public synchronized void flush() throws IOException { - if (lastChannelFuture == null) { - return; - } - - lastChannelFuture.awaitUninterruptibly(); - if (!lastChannelFuture.isSuccess()) { - Throwable t = lastChannelFuture.getCause(); - if (t != null) { - throw new IOException( - "The bytes could not be written to the session", t); - } else { - throw new IOException( - "The bytes could not be written to the session"); - } - - } - } - -} diff --git a/src/main/java/org/jboss/netty/handler/stream/StreamHandler.java b/src/main/java/org/jboss/netty/handler/stream/StreamHandler.java deleted file mode 100644 index 2eb98bdef8..0000000000 --- a/src/main/java/org/jboss/netty/handler/stream/StreamHandler.java +++ /dev/null @@ -1,181 +0,0 @@ -package org.jboss.netty.handler.stream; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.SocketTimeoutException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.handler.timeout.ReadTimeoutHandler; -import org.jboss.netty.util.Timer; - -/** - * Abstract base class which could be used if you need to use {@link InputStream} and {@link OutputStream} directly in your Handler. - * Because of the blocking nature of {@link InputStream} it will spawn a new Thread on every new connected {@link Channel} - * - * @author The Netty Project - * @author Norman Maurer - */ -public abstract class StreamHandler extends ReadTimeoutHandler{ - - private final ExecutorService executor; - - private static final String KEY_IN = "stream.in"; - private static final String KEY_OUT = "stream.out"; - - - /** - * Create a new Instance which use a cached ThreadPool with no limit to perform the stream handling - * - * @param timer the {@link Timer} which is used for the timeout calculation - * @param readTimeout the read timeout. After the timeout a {@link ReadTimeOutException} will be thrown - * @param unit the {@link TimeUnit} to use for the readerIdleTime - */ - public StreamHandler(Timer timer, long readTimeout, TimeUnit unit) { - this(timer, readTimeout, unit, Executors.newCachedThreadPool()); - } - - - /** - * Create a new Instance which use thre give {@link ExecutorService} to perform the stream handling - * - * @param timer the {@link Timer} which is used for the timeout calculation - * @param readTimeout the read timeout in seconds. After the timeout a {@link ReadTimeOutException} will be thrown - * @param unit the {@link TimeUnit} to use for the readerIdleTime - * @param executor the {@link ExecutorService} to use for off load the blocking tasks - */ - public StreamHandler(Timer timer, long readTimeout, TimeUnit unit, ExecutorService executor) { - super(timer, readTimeout, unit); - this.executor = executor; - } - - - /** - * Implement this method to execute your stream I/O logic - * - * The method will get executed in a new Thread - * - */ - protected abstract void processStreamIo(final ChannelHandlerContext ctx, final InputStream in, - OutputStream out); - - - - /** - * Fire of the {@link #processStreamIo(ChannelHandlerContext, InputStream, OutputStream)} method - */ - @Override - public void channelConnected(final ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - - // Create streams - final InputStream in = new BlockingChannelBufferInputStream(); - final OutputStream out = new ChannelOutputStream(ctx.getChannel()); - Map attachment = getAttachment(ctx); - attachment.put(KEY_IN, in); - attachment.put(KEY_OUT, out); - executor.execute(new Runnable() { - - public void run() { - processStreamIo(ctx, in, out); - } - }); - ctx.setAttachment(attachment); - super.channelConnected(ctx, e); - } - - /** - * Return the Map which is used as Attachment to the {@link ChannelHandlerContext} - * - * You should use this map if you need to store attachments on the {@link ChannelHandlerContext} - * - * @param ctx - * @return attachmentMap - */ - @SuppressWarnings("unchecked") - protected final Map getAttachment(ChannelHandlerContext ctx) { - Map attachment = (Map) ctx.getAttachment(); - if (attachment == null) { - attachment = new HashMap(); - ctx.setAttachment(attachment); - } - return attachment; - } - /** - * Closes streams - */ - @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - Map attachment = getAttachment(ctx); - - final InputStream in = (InputStream) attachment.get(KEY_IN); - final OutputStream out = (OutputStream) attachment.get(KEY_OUT); - try { - in.close(); - } finally { - out.close(); - } - super.channelClosed(ctx, e); - } - - - /** - * Forwards read data to input stream. - */ - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN); - in.write((ChannelBuffer) e.getMessage()); - super.messageReceived(ctx, e); - } - - /** - * Forwards caught exceptions to input stream. - */ - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN); - IOException ex = null; - if (e.getCause() instanceof ReadTimeOutException) { - ex = (IOException) e.getCause().getCause(); - } else if (e.getCause() instanceof IOException) { - ex = (IOException) e.getCause(); - } - - if (e != null && in != null) { - in.throwException(ex); - } else { - ctx.getChannel().close(); - } - } - - - @Override - protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { - throw new ReadTimeOutException(new SocketTimeoutException("Read timeout")); - } - - - /** - * - * Exception thrown on a read timeout - * - */ - private static class ReadTimeOutException extends RuntimeException { - private static final long serialVersionUID = 3976736960742503222L; - - public ReadTimeOutException(IOException cause) { - super(cause); - } - } - -}