Remove in favor of IOStreamChannelFactory
This commit is contained in:
parent
a67e550207
commit
aa5510e3b2
@ -1,170 +0,0 @@
|
|||||||
package org.jboss.netty.handler.stream;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link InputStream} implementation which can be used to write {@link ChannelBuffer}
|
|
||||||
* objects to and read them
|
|
||||||
*
|
|
||||||
*
|
|
||||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
|
||||||
* @author <a href="http://www.murkycloud.com/">Norman Maurer</a>
|
|
||||||
*/
|
|
||||||
public class BlockingChannelBufferInputStream extends InputStream{
|
|
||||||
private final Object mutex = new Object();
|
|
||||||
|
|
||||||
private final ChannelBuffer buf;
|
|
||||||
|
|
||||||
private volatile boolean closed;
|
|
||||||
|
|
||||||
private volatile boolean released;
|
|
||||||
|
|
||||||
private IOException exception;
|
|
||||||
|
|
||||||
public BlockingChannelBufferInputStream() {
|
|
||||||
buf = ChannelBuffers.dynamicBuffer();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int available() {
|
|
||||||
if (released) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (mutex) {
|
|
||||||
return buf.readableBytes();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
if (closed) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (mutex) {
|
|
||||||
closed = true;
|
|
||||||
releaseBuffer();
|
|
||||||
|
|
||||||
mutex.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int read() throws IOException {
|
|
||||||
synchronized (mutex) {
|
|
||||||
if (!waitForData()) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return buf.readByte() & 0xff;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int read(byte[] b, int off, int len) throws IOException {
|
|
||||||
synchronized (mutex) {
|
|
||||||
if (!waitForData()) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int readBytes;
|
|
||||||
|
|
||||||
if (len > buf.readableBytes()) {
|
|
||||||
readBytes = buf.readableBytes();
|
|
||||||
} else {
|
|
||||||
readBytes = len;
|
|
||||||
}
|
|
||||||
|
|
||||||
buf.readBytes(b, off, readBytes);
|
|
||||||
|
|
||||||
return readBytes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean waitForData() throws IOException {
|
|
||||||
if (released) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (mutex) {
|
|
||||||
while (!released && buf.readableBytes() == 0 && exception == null) {
|
|
||||||
try {
|
|
||||||
mutex.wait();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
IOException ioe = new IOException(
|
|
||||||
"Interrupted while waiting for more data");
|
|
||||||
ioe.initCause(e);
|
|
||||||
throw ioe;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (exception != null) {
|
|
||||||
releaseBuffer();
|
|
||||||
throw exception;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (closed && buf.readableBytes() == 0) {
|
|
||||||
releaseBuffer();
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void releaseBuffer() {
|
|
||||||
if (released) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
released = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Write the {@link ChannelBuffer} to {@link InputStream} and unblock the
|
|
||||||
* read methods
|
|
||||||
*
|
|
||||||
* @param src buffer
|
|
||||||
*/
|
|
||||||
public void write(ChannelBuffer src) {
|
|
||||||
synchronized (mutex) {
|
|
||||||
if (closed) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (buf.readable()) {
|
|
||||||
|
|
||||||
this.buf.writeBytes(src);
|
|
||||||
//this.buf.readerIndex(0);
|
|
||||||
} else {
|
|
||||||
this.buf.clear();
|
|
||||||
this.buf.writeBytes(src);
|
|
||||||
this.buf.readerIndex(0);
|
|
||||||
mutex.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Throw the given {@link IOException} on the next read call
|
|
||||||
*
|
|
||||||
* @param e
|
|
||||||
*/
|
|
||||||
public void throwException(IOException e) {
|
|
||||||
synchronized (mutex) {
|
|
||||||
if (exception == null) {
|
|
||||||
exception = e;
|
|
||||||
|
|
||||||
mutex.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
@ -1,80 +0,0 @@
|
|||||||
package org.jboss.netty.handler.stream;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
|
||||||
import org.jboss.netty.channel.Channel;
|
|
||||||
import org.jboss.netty.channel.ChannelFuture;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@link OutputStream} which write data to the wrapped {@link Channel}
|
|
||||||
*
|
|
||||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
|
||||||
* @author <a href="http://www.murkycloud.com/">Norman Maurer</a>
|
|
||||||
*/
|
|
||||||
public class ChannelOutputStream extends OutputStream{
|
|
||||||
|
|
||||||
private final Channel channel;
|
|
||||||
|
|
||||||
private ChannelFuture lastChannelFuture;
|
|
||||||
|
|
||||||
public ChannelOutputStream(Channel channel) {
|
|
||||||
this.channel = channel;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
try {
|
|
||||||
flush();
|
|
||||||
} finally {
|
|
||||||
channel.close().awaitUninterruptibly();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkClosed() throws IOException {
|
|
||||||
if (!channel.isConnected()) {
|
|
||||||
throw new IOException("The session has been closed.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized void write(ChannelBuffer buf) throws IOException {
|
|
||||||
checkClosed();
|
|
||||||
ChannelFuture future = channel.write(buf);
|
|
||||||
lastChannelFuture = future;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void write(byte[] b, int off, int len) throws IOException {
|
|
||||||
write(ChannelBuffers.copiedBuffer(b.clone(), off, len));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void write(int b) throws IOException {
|
|
||||||
ChannelBuffer buf = ChannelBuffers.buffer(1);
|
|
||||||
buf.writeByte((byte) b);
|
|
||||||
write(buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void flush() throws IOException {
|
|
||||||
if (lastChannelFuture == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
lastChannelFuture.awaitUninterruptibly();
|
|
||||||
if (!lastChannelFuture.isSuccess()) {
|
|
||||||
Throwable t = lastChannelFuture.getCause();
|
|
||||||
if (t != null) {
|
|
||||||
throw new IOException(
|
|
||||||
"The bytes could not be written to the session", t);
|
|
||||||
} else {
|
|
||||||
throw new IOException(
|
|
||||||
"The bytes could not be written to the session");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,181 +0,0 @@
|
|||||||
package org.jboss.netty.handler.stream;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.SocketTimeoutException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
|
||||||
import org.jboss.netty.channel.Channel;
|
|
||||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
|
||||||
import org.jboss.netty.channel.ChannelStateEvent;
|
|
||||||
import org.jboss.netty.channel.ExceptionEvent;
|
|
||||||
import org.jboss.netty.channel.MessageEvent;
|
|
||||||
import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
|
|
||||||
import org.jboss.netty.util.Timer;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Abstract base class which could be used if you need to use {@link InputStream} and {@link OutputStream} directly in your Handler.
|
|
||||||
* Because of the blocking nature of {@link InputStream} it will spawn a new Thread on every new connected {@link Channel}
|
|
||||||
*
|
|
||||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
|
||||||
* @author <a href="http://www.murkycloud.com/">Norman Maurer</a>
|
|
||||||
*/
|
|
||||||
public abstract class StreamHandler extends ReadTimeoutHandler{
|
|
||||||
|
|
||||||
private final ExecutorService executor;
|
|
||||||
|
|
||||||
private static final String KEY_IN = "stream.in";
|
|
||||||
private static final String KEY_OUT = "stream.out";
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new Instance which use a cached ThreadPool with no limit to perform the stream handling
|
|
||||||
*
|
|
||||||
* @param timer the {@link Timer} which is used for the timeout calculation
|
|
||||||
* @param readTimeout the read timeout. After the timeout a {@link ReadTimeOutException} will be thrown
|
|
||||||
* @param unit the {@link TimeUnit} to use for the readerIdleTime
|
|
||||||
*/
|
|
||||||
public StreamHandler(Timer timer, long readTimeout, TimeUnit unit) {
|
|
||||||
this(timer, readTimeout, unit, Executors.newCachedThreadPool());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new Instance which use thre give {@link ExecutorService} to perform the stream handling
|
|
||||||
*
|
|
||||||
* @param timer the {@link Timer} which is used for the timeout calculation
|
|
||||||
* @param readTimeout the read timeout in seconds. After the timeout a {@link ReadTimeOutException} will be thrown
|
|
||||||
* @param unit the {@link TimeUnit} to use for the readerIdleTime
|
|
||||||
* @param executor the {@link ExecutorService} to use for off load the blocking tasks
|
|
||||||
*/
|
|
||||||
public StreamHandler(Timer timer, long readTimeout, TimeUnit unit, ExecutorService executor) {
|
|
||||||
super(timer, readTimeout, unit);
|
|
||||||
this.executor = executor;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Implement this method to execute your stream I/O logic
|
|
||||||
*
|
|
||||||
* The method will get executed in a new Thread
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
protected abstract void processStreamIo(final ChannelHandlerContext ctx, final InputStream in,
|
|
||||||
OutputStream out);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Fire of the {@link #processStreamIo(ChannelHandlerContext, InputStream, OutputStream)} method
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void channelConnected(final ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
|
||||||
|
|
||||||
// Create streams
|
|
||||||
final InputStream in = new BlockingChannelBufferInputStream();
|
|
||||||
final OutputStream out = new ChannelOutputStream(ctx.getChannel());
|
|
||||||
Map<Object, Object> attachment = getAttachment(ctx);
|
|
||||||
attachment.put(KEY_IN, in);
|
|
||||||
attachment.put(KEY_OUT, out);
|
|
||||||
executor.execute(new Runnable() {
|
|
||||||
|
|
||||||
public void run() {
|
|
||||||
processStreamIo(ctx, in, out);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
ctx.setAttachment(attachment);
|
|
||||||
super.channelConnected(ctx, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the Map which is used as Attachment to the {@link ChannelHandlerContext}
|
|
||||||
*
|
|
||||||
* You should use this map if you need to store attachments on the {@link ChannelHandlerContext}
|
|
||||||
*
|
|
||||||
* @param ctx
|
|
||||||
* @return attachmentMap
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
protected final Map<Object,Object> getAttachment(ChannelHandlerContext ctx) {
|
|
||||||
Map<Object,Object> attachment = (Map<Object, Object>) ctx.getAttachment();
|
|
||||||
if (attachment == null) {
|
|
||||||
attachment = new HashMap<Object, Object>();
|
|
||||||
ctx.setAttachment(attachment);
|
|
||||||
}
|
|
||||||
return attachment;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Closes streams
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
|
||||||
Map<Object, Object> attachment = getAttachment(ctx);
|
|
||||||
|
|
||||||
final InputStream in = (InputStream) attachment.get(KEY_IN);
|
|
||||||
final OutputStream out = (OutputStream) attachment.get(KEY_OUT);
|
|
||||||
try {
|
|
||||||
in.close();
|
|
||||||
} finally {
|
|
||||||
out.close();
|
|
||||||
}
|
|
||||||
super.channelClosed(ctx, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Forwards read data to input stream.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
|
||||||
final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN);
|
|
||||||
in.write((ChannelBuffer) e.getMessage());
|
|
||||||
super.messageReceived(ctx, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Forwards caught exceptions to input stream.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
|
||||||
final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN);
|
|
||||||
IOException ex = null;
|
|
||||||
if (e.getCause() instanceof ReadTimeOutException) {
|
|
||||||
ex = (IOException) e.getCause().getCause();
|
|
||||||
} else if (e.getCause() instanceof IOException) {
|
|
||||||
ex = (IOException) e.getCause();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (e != null && in != null) {
|
|
||||||
in.throwException(ex);
|
|
||||||
} else {
|
|
||||||
ctx.getChannel().close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
throw new ReadTimeOutException(new SocketTimeoutException("Read timeout"));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Exception thrown on a read timeout
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
private static class ReadTimeOutException extends RuntimeException {
|
|
||||||
private static final long serialVersionUID = 3976736960742503222L;
|
|
||||||
|
|
||||||
public ReadTimeOutException(IOException cause) {
|
|
||||||
super(cause);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user