Add the StreamHandler which can be used to adapt old blocking io network

code to netty. See NETTY-307
This commit is contained in:
Norman Maurer 2011-10-23 19:32:00 +02:00
parent 579ddf72fc
commit 0fbe65b076
3 changed files with 428 additions and 0 deletions

View File

@ -0,0 +1,169 @@
package org.jboss.netty.handler.stream;
import java.io.IOException;
import java.io.InputStream;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
/**
* {@link InputStream} implementation which can be used to write {@link ChannelBuffer}
* objects to and read them
*
*
* @author Norman Maurer
*/
public class BlockingChannelBufferInputStream extends InputStream{
private final Object mutex = new Object();
private final ChannelBuffer buf;
private volatile boolean closed;
private volatile boolean released;
private IOException exception;
public BlockingChannelBufferInputStream() {
buf = ChannelBuffers.dynamicBuffer();
}
@Override
public int available() {
if (released) {
return 0;
}
synchronized (mutex) {
return buf.readableBytes();
}
}
@Override
public void close() {
if (closed) {
return;
}
synchronized (mutex) {
closed = true;
releaseBuffer();
mutex.notifyAll();
}
}
@Override
public int read() throws IOException {
synchronized (mutex) {
if (!waitForData()) {
return -1;
}
return buf.readByte() & 0xff;
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
synchronized (mutex) {
if (!waitForData()) {
return -1;
}
int readBytes;
if (len > buf.readableBytes()) {
readBytes = buf.readableBytes();
} else {
readBytes = len;
}
buf.readBytes(b, off, readBytes);
return readBytes;
}
}
private boolean waitForData() throws IOException {
if (released) {
return false;
}
synchronized (mutex) {
while (!released && buf.readableBytes() == 0 && exception == null) {
try {
mutex.wait();
} catch (InterruptedException e) {
IOException ioe = new IOException(
"Interrupted while waiting for more data");
ioe.initCause(e);
throw ioe;
}
}
}
if (exception != null) {
releaseBuffer();
throw exception;
}
if (closed && buf.readableBytes() == 0) {
releaseBuffer();
return false;
}
return true;
}
private void releaseBuffer() {
if (released) {
return;
}
released = true;
}
/**
* Write the {@link ChannelBuffer} to {@link InputStream} and unblock the
* read methods
*
* @param src buffer
*/
public void write(ChannelBuffer src) {
synchronized (mutex) {
if (closed) {
return;
}
if (buf.readable()) {
this.buf.writeBytes(src);
//this.buf.readerIndex(0);
} else {
this.buf.clear();
this.buf.writeBytes(src);
this.buf.readerIndex(0);
mutex.notifyAll();
}
}
}
/**
* Throw the given {@link IOException} on the next read call
*
* @param e
*/
public void throwException(IOException e) {
synchronized (mutex) {
if (exception == null) {
exception = e;
mutex.notifyAll();
}
}
}
}

View File

@ -0,0 +1,79 @@
package org.jboss.netty.handler.stream;
import java.io.IOException;
import java.io.OutputStream;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
/**
* {@link OutputStream} which write data to the wrapped {@link Channel}
*
* @author Norman Maurer
*/
public class ChannelOutputStream extends OutputStream{
private final Channel channel;
private ChannelFuture lastChannelFuture;
public ChannelOutputStream(Channel channel) {
this.channel = channel;
}
@Override
public void close() throws IOException {
try {
flush();
} finally {
channel.close().awaitUninterruptibly();
}
}
private void checkClosed() throws IOException {
if (!channel.isConnected()) {
throw new IOException("The session has been closed.");
}
}
private synchronized void write(ChannelBuffer buf) throws IOException {
checkClosed();
ChannelFuture future = channel.write(buf);
lastChannelFuture = future;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
write(ChannelBuffers.copiedBuffer(b.clone(), off, len));
}
@Override
public void write(int b) throws IOException {
ChannelBuffer buf = ChannelBuffers.buffer(1);
buf.writeByte((byte) b);
write(buf);
}
@Override
public synchronized void flush() throws IOException {
if (lastChannelFuture == null) {
return;
}
lastChannelFuture.awaitUninterruptibly();
if (!lastChannelFuture.isSuccess()) {
Throwable t = lastChannelFuture.getCause();
if (t != null) {
throw new IOException(
"The bytes could not be written to the session", t);
} else {
throw new IOException(
"The bytes could not be written to the session");
}
}
}
}

View File

@ -0,0 +1,180 @@
package org.jboss.netty.handler.stream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
import org.jboss.netty.util.Timer;
/**
* Abstract base class which could be used if you need to use {@link InputStream} and {@link OutputStream} directly in your Handler.
* Because of the blocking nature of {@link InputStream} it will spawn a new Thread on every new connected {@link Channel}
*
* @author Norman Maurer
*/
public abstract class StreamHandler extends ReadTimeoutHandler{
private final ExecutorService executor;
private static final String KEY_IN = "stream.in";
private static final String KEY_OUT = "stream.out";
/**
* Create a new Instance which use a cached ThreadPool with no limit to perform the stream handling
*
* @param timer the {@link Timer} which is used for the timeout calculation
* @param readTimeout the read timeout. After the timeout a {@link ReadTimeOutException} will be thrown
* @param unit the {@link TimeUnit} to use for the readerIdleTime
*/
public StreamHandler(Timer timer, long readTimeout, TimeUnit unit) {
this(timer, readTimeout, unit, Executors.newCachedThreadPool());
}
/**
* Create a new Instance which use thre give {@link ExecutorService} to perform the stream handling
*
* @param timer the {@link Timer} which is used for the timeout calculation
* @param readTimeout the read timeout in seconds. After the timeout a {@link ReadTimeOutException} will be thrown
* @param unit the {@link TimeUnit} to use for the readerIdleTime
* @param executor the {@link ExecutorService} to use for off load the blocking tasks
*/
public StreamHandler(Timer timer, long readTimeout, TimeUnit unit, ExecutorService executor) {
super(timer, readTimeout, unit);
this.executor = executor;
}
/**
* Implement this method to execute your stream I/O logic
*
* The method will get executed in a new Thread
*
*/
protected abstract void processStreamIo(final ChannelHandlerContext ctx, final InputStream in,
OutputStream out);
/**
* Fire of the {@link #processStreamIo(ChannelHandlerContext, InputStream, OutputStream)} method
*/
@Override
public void channelConnected(final ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// Create streams
final InputStream in = new BlockingChannelBufferInputStream();
final OutputStream out = new ChannelOutputStream(ctx.getChannel());
Map<Object, Object> attachment = getAttachment(ctx);
attachment.put(KEY_IN, in);
attachment.put(KEY_OUT, out);
executor.execute(new Runnable() {
public void run() {
processStreamIo(ctx, in, out);
}
});
ctx.setAttachment(attachment);
super.channelConnected(ctx, e);
}
/**
* Return the Map which is used as Attachment to the {@link ChannelHandlerContext}
*
* You should use this map if you need to store attachments on the {@link ChannelHandlerContext}
*
* @param ctx
* @return attachmentMap
*/
@SuppressWarnings("unchecked")
protected final Map<Object,Object> getAttachment(ChannelHandlerContext ctx) {
Map<Object,Object> attachment = (Map<Object, Object>) ctx.getAttachment();
if (attachment == null) {
attachment = new HashMap<Object, Object>();
ctx.setAttachment(attachment);
}
return attachment;
}
/**
* Closes streams
*/
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Map<Object, Object> attachment = getAttachment(ctx);
final InputStream in = (InputStream) attachment.get(KEY_IN);
final OutputStream out = (OutputStream) attachment.get(KEY_OUT);
try {
in.close();
} finally {
out.close();
}
super.channelClosed(ctx, e);
}
/**
* Forwards read data to input stream.
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN);
in.write((ChannelBuffer) e.getMessage());
super.messageReceived(ctx, e);
}
/**
* Forwards caught exceptions to input stream.
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN);
IOException ex = null;
if (e.getCause() instanceof ReadTimeOutException) {
ex = (IOException) e.getCause().getCause();
} else if (e.getCause() instanceof IOException) {
ex = (IOException) e.getCause();
}
if (e != null && in != null) {
in.throwException(ex);
} else {
ctx.getChannel().close();
}
}
@Override
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
throw new ReadTimeOutException(new SocketTimeoutException("Read timeout"));
}
/**
*
* Exception thrown on a read timeout
*
*/
private static class ReadTimeOutException extends RuntimeException {
private static final long serialVersionUID = 3976736960742503222L;
public ReadTimeOutException(IOException cause) {
super(cause);
}
}
}