From 0fbe65b0763ff2f835a38825fff98d995a8946fb Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sun, 23 Oct 2011 19:32:00 +0200 Subject: [PATCH] Add the StreamHandler which can be used to adapt old blocking io network code to netty. See NETTY-307 --- .../BlockingChannelBufferInputStream.java | 169 ++++++++++++++++ .../handler/stream/ChannelOutputStream.java | 79 ++++++++ .../netty/handler/stream/StreamHandler.java | 180 ++++++++++++++++++ 3 files changed, 428 insertions(+) create mode 100644 src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java create mode 100644 src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java create mode 100644 src/main/java/org/jboss/netty/handler/stream/StreamHandler.java diff --git a/src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java b/src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java new file mode 100644 index 0000000000..1d78778cc2 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java @@ -0,0 +1,169 @@ +package org.jboss.netty.handler.stream; + +import java.io.IOException; +import java.io.InputStream; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +/** + * {@link InputStream} implementation which can be used to write {@link ChannelBuffer} + * objects to and read them + * + * + * @author Norman Maurer + */ +public class BlockingChannelBufferInputStream extends InputStream{ + private final Object mutex = new Object(); + + private final ChannelBuffer buf; + + private volatile boolean closed; + + private volatile boolean released; + + private IOException exception; + + public BlockingChannelBufferInputStream() { + buf = ChannelBuffers.dynamicBuffer(); + } + + @Override + public int available() { + if (released) { + return 0; + } + + synchronized (mutex) { + return buf.readableBytes(); + } + } + + @Override + public void close() { + if (closed) { + return; + } + + synchronized (mutex) { + closed = true; + releaseBuffer(); + + mutex.notifyAll(); + } + } + + @Override + public int read() throws IOException { + synchronized (mutex) { + if (!waitForData()) { + return -1; + } + + return buf.readByte() & 0xff; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + synchronized (mutex) { + if (!waitForData()) { + return -1; + } + + int readBytes; + + if (len > buf.readableBytes()) { + readBytes = buf.readableBytes(); + } else { + readBytes = len; + } + + buf.readBytes(b, off, readBytes); + + return readBytes; + } + } + + private boolean waitForData() throws IOException { + if (released) { + return false; + } + + synchronized (mutex) { + while (!released && buf.readableBytes() == 0 && exception == null) { + try { + mutex.wait(); + } catch (InterruptedException e) { + IOException ioe = new IOException( + "Interrupted while waiting for more data"); + ioe.initCause(e); + throw ioe; + } + } + } + + if (exception != null) { + releaseBuffer(); + throw exception; + } + + if (closed && buf.readableBytes() == 0) { + releaseBuffer(); + + return false; + } + + return true; + } + + private void releaseBuffer() { + if (released) { + return; + } + + released = true; + } + + /** + * Write the {@link ChannelBuffer} to {@link InputStream} and unblock the + * read methods + * + * @param src buffer + */ + public void write(ChannelBuffer src) { + synchronized (mutex) { + if (closed) { + return; + } + + if (buf.readable()) { + + this.buf.writeBytes(src); + //this.buf.readerIndex(0); + } else { + this.buf.clear(); + this.buf.writeBytes(src); + this.buf.readerIndex(0); + mutex.notifyAll(); + } + } + } + + /** + * Throw the given {@link IOException} on the next read call + * + * @param e + */ + public void throwException(IOException e) { + synchronized (mutex) { + if (exception == null) { + exception = e; + + mutex.notifyAll(); + } + } + } + + +} diff --git a/src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java b/src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java new file mode 100644 index 0000000000..2a8fddeadf --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java @@ -0,0 +1,79 @@ +package org.jboss.netty.handler.stream; + +import java.io.IOException; +import java.io.OutputStream; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; + +/** + * {@link OutputStream} which write data to the wrapped {@link Channel} + * + * @author Norman Maurer + */ +public class ChannelOutputStream extends OutputStream{ + + private final Channel channel; + + private ChannelFuture lastChannelFuture; + + public ChannelOutputStream(Channel channel) { + this.channel = channel; + } + + @Override + public void close() throws IOException { + try { + flush(); + } finally { + channel.close().awaitUninterruptibly(); + } + } + + private void checkClosed() throws IOException { + if (!channel.isConnected()) { + throw new IOException("The session has been closed."); + } + } + + private synchronized void write(ChannelBuffer buf) throws IOException { + checkClosed(); + ChannelFuture future = channel.write(buf); + lastChannelFuture = future; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + write(ChannelBuffers.copiedBuffer(b.clone(), off, len)); + } + + @Override + public void write(int b) throws IOException { + ChannelBuffer buf = ChannelBuffers.buffer(1); + buf.writeByte((byte) b); + write(buf); + } + + @Override + public synchronized void flush() throws IOException { + if (lastChannelFuture == null) { + return; + } + + lastChannelFuture.awaitUninterruptibly(); + if (!lastChannelFuture.isSuccess()) { + Throwable t = lastChannelFuture.getCause(); + if (t != null) { + throw new IOException( + "The bytes could not be written to the session", t); + } else { + throw new IOException( + "The bytes could not be written to the session"); + } + + } + } + +} diff --git a/src/main/java/org/jboss/netty/handler/stream/StreamHandler.java b/src/main/java/org/jboss/netty/handler/stream/StreamHandler.java new file mode 100644 index 0000000000..e2f89e5270 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/stream/StreamHandler.java @@ -0,0 +1,180 @@ +package org.jboss.netty.handler.stream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.SocketTimeoutException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.handler.timeout.ReadTimeoutHandler; +import org.jboss.netty.util.Timer; + +/** + * Abstract base class which could be used if you need to use {@link InputStream} and {@link OutputStream} directly in your Handler. + * Because of the blocking nature of {@link InputStream} it will spawn a new Thread on every new connected {@link Channel} + * + * @author Norman Maurer + */ +public abstract class StreamHandler extends ReadTimeoutHandler{ + + private final ExecutorService executor; + + private static final String KEY_IN = "stream.in"; + private static final String KEY_OUT = "stream.out"; + + + /** + * Create a new Instance which use a cached ThreadPool with no limit to perform the stream handling + * + * @param timer the {@link Timer} which is used for the timeout calculation + * @param readTimeout the read timeout. After the timeout a {@link ReadTimeOutException} will be thrown + * @param unit the {@link TimeUnit} to use for the readerIdleTime + */ + public StreamHandler(Timer timer, long readTimeout, TimeUnit unit) { + this(timer, readTimeout, unit, Executors.newCachedThreadPool()); + } + + + /** + * Create a new Instance which use thre give {@link ExecutorService} to perform the stream handling + * + * @param timer the {@link Timer} which is used for the timeout calculation + * @param readTimeout the read timeout in seconds. After the timeout a {@link ReadTimeOutException} will be thrown + * @param unit the {@link TimeUnit} to use for the readerIdleTime + * @param executor the {@link ExecutorService} to use for off load the blocking tasks + */ + public StreamHandler(Timer timer, long readTimeout, TimeUnit unit, ExecutorService executor) { + super(timer, readTimeout, unit); + this.executor = executor; + } + + + /** + * Implement this method to execute your stream I/O logic + * + * The method will get executed in a new Thread + * + */ + protected abstract void processStreamIo(final ChannelHandlerContext ctx, final InputStream in, + OutputStream out); + + + + /** + * Fire of the {@link #processStreamIo(ChannelHandlerContext, InputStream, OutputStream)} method + */ + @Override + public void channelConnected(final ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + + // Create streams + final InputStream in = new BlockingChannelBufferInputStream(); + final OutputStream out = new ChannelOutputStream(ctx.getChannel()); + Map attachment = getAttachment(ctx); + attachment.put(KEY_IN, in); + attachment.put(KEY_OUT, out); + executor.execute(new Runnable() { + + public void run() { + processStreamIo(ctx, in, out); + } + }); + ctx.setAttachment(attachment); + super.channelConnected(ctx, e); + } + + /** + * Return the Map which is used as Attachment to the {@link ChannelHandlerContext} + * + * You should use this map if you need to store attachments on the {@link ChannelHandlerContext} + * + * @param ctx + * @return attachmentMap + */ + @SuppressWarnings("unchecked") + protected final Map getAttachment(ChannelHandlerContext ctx) { + Map attachment = (Map) ctx.getAttachment(); + if (attachment == null) { + attachment = new HashMap(); + ctx.setAttachment(attachment); + } + return attachment; + } + /** + * Closes streams + */ + @Override + public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + Map attachment = getAttachment(ctx); + + final InputStream in = (InputStream) attachment.get(KEY_IN); + final OutputStream out = (OutputStream) attachment.get(KEY_OUT); + try { + in.close(); + } finally { + out.close(); + } + super.channelClosed(ctx, e); + } + + + /** + * Forwards read data to input stream. + */ + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN); + in.write((ChannelBuffer) e.getMessage()); + super.messageReceived(ctx, e); + } + + /** + * Forwards caught exceptions to input stream. + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { + final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN); + IOException ex = null; + if (e.getCause() instanceof ReadTimeOutException) { + ex = (IOException) e.getCause().getCause(); + } else if (e.getCause() instanceof IOException) { + ex = (IOException) e.getCause(); + } + + if (e != null && in != null) { + in.throwException(ex); + } else { + ctx.getChannel().close(); + } + } + + + @Override + protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { + throw new ReadTimeOutException(new SocketTimeoutException("Read timeout")); + } + + + /** + * + * Exception thrown on a read timeout + * + */ + private static class ReadTimeOutException extends RuntimeException { + private static final long serialVersionUID = 3976736960742503222L; + + public ReadTimeOutException(IOException cause) { + super(cause); + } + } + +}