Replace tabs by spaces

This commit is contained in:
Norman Maurer 2011-11-03 19:19:00 +01:00
parent 220d95fe0d
commit 57a6d39014
3 changed files with 160 additions and 162 deletions

View File

@ -20,29 +20,30 @@ import java.io.OutputStream;
import java.net.SocketAddress; import java.net.SocketAddress;
/** /**
* A {@link java.net.SocketAddress} implementation holding an {@link java.io.InputStream} and an {@link java.io.OutputStream} instance used as * A {@link java.net.SocketAddress} implementation holding an
* "remote" address to connect to with a {@link IOStreamChannel}. * {@link java.io.InputStream} and an {@link java.io.OutputStream} instance used
* * as "remote" address to connect to with a {@link IOStreamChannel}.
*
* @author Daniel Bimschas * @author Daniel Bimschas
* @author Dennis Pfisterer * @author Dennis Pfisterer
*/ */
public class IOStreamAddress extends SocketAddress { public class IOStreamAddress extends SocketAddress {
private final InputStream inputStream; private final InputStream inputStream;
private final OutputStream outputStream; private final OutputStream outputStream;
public IOStreamAddress(final InputStream inputStream, final OutputStream outputStream) { public IOStreamAddress(final InputStream inputStream, final OutputStream outputStream) {
this.inputStream = inputStream; this.inputStream = inputStream;
this.outputStream = outputStream; this.outputStream = outputStream;
} }
public InputStream getInputStream() { public InputStream getInputStream() {
return inputStream; return inputStream;
} }
public OutputStream getOutputStream() { public OutputStream getOutputStream() {
return outputStream; return outputStream;
} }
} }

View File

@ -15,59 +15,59 @@
*/ */
package org.jboss.netty.channel.iostream; package org.jboss.netty.channel.iostream;
import org.jboss.netty.channel.*; import org.jboss.netty.channel.*;
import java.net.SocketAddress; import java.net.SocketAddress;
/** /**
* A channel to an {@link java.io.InputStream} and an {@link java.io.OutputStream}. * A channel to an {@link java.io.InputStream} and an
* * {@link java.io.OutputStream}.
*
* @author Daniel Bimschas * @author Daniel Bimschas
* @author Dennis Pfisterer * @author Dennis Pfisterer
*/ */
public class IOStreamChannel extends AbstractChannel { public class IOStreamChannel extends AbstractChannel {
IOStreamChannel(final ChannelFactory factory, final ChannelPipeline pipeline, final ChannelSink sink) { IOStreamChannel(final ChannelFactory factory, final ChannelPipeline pipeline, final ChannelSink sink) {
super(null, factory, pipeline, sink); super(null, factory, pipeline, sink);
} }
@Override @Override
public ChannelConfig getConfig() { public ChannelConfig getConfig() {
return ((IOStreamChannelSink) getPipeline().getSink()).getConfig(); return ((IOStreamChannelSink) getPipeline().getSink()).getConfig();
} }
@Override @Override
public boolean isBound() { public boolean isBound() {
return ((IOStreamChannelSink) getPipeline().getSink()).isBound(); return ((IOStreamChannelSink) getPipeline().getSink()).isBound();
} }
@Override @Override
public boolean isConnected() { public boolean isConnected() {
return ((IOStreamChannelSink) getPipeline().getSink()).isConnected(); return ((IOStreamChannelSink) getPipeline().getSink()).isConnected();
} }
@Override @Override
public SocketAddress getLocalAddress() { public SocketAddress getLocalAddress() {
return null; return null;
} }
@Override @Override
public SocketAddress getRemoteAddress() { public SocketAddress getRemoteAddress() {
return ((IOStreamChannelSink) getPipeline().getSink()).getRemoteAddress(); return ((IOStreamChannelSink) getPipeline().getSink()).getRemoteAddress();
} }
@Override @Override
public ChannelFuture bind(final SocketAddress localAddress) { public ChannelFuture bind(final SocketAddress localAddress) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public ChannelFuture unbind() { public ChannelFuture unbind() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
void doSetClosed() { void doSetClosed() {
setClosed(); setClosed();
} }
} }

View File

@ -26,153 +26,150 @@ import java.util.concurrent.ExecutorService;
import static org.jboss.netty.channel.Channels.*; import static org.jboss.netty.channel.Channels.*;
/** /**
* A {@link org.jboss.netty.channel.ChannelSink} implementation which reads from an {@link java.io.InputStream} and * A {@link org.jboss.netty.channel.ChannelSink} implementation which reads from
* writes to an {@link java.io.OutputStream}. * an {@link java.io.InputStream} and writes to an {@link java.io.OutputStream}.
* *
* @author Daniel Bimschas * @author Daniel Bimschas
* @author Dennis Pfisterer * @author Dennis Pfisterer
*/ */
public class IOStreamChannelSink extends AbstractChannelSink { public class IOStreamChannelSink extends AbstractChannelSink {
private static class ReadRunnable implements Runnable { private static class ReadRunnable implements Runnable {
private final IOStreamChannelSink channelSink; private final IOStreamChannelSink channelSink;
public ReadRunnable(final IOStreamChannelSink channelSink) { public ReadRunnable(final IOStreamChannelSink channelSink) {
this.channelSink = channelSink; this.channelSink = channelSink;
} }
@Override @Override
public void run() { public void run() {
PushbackInputStream in = channelSink.inputStream; PushbackInputStream in = channelSink.inputStream;
while (channelSink.channel.isOpen()) { while (channelSink.channel.isOpen()) {
byte[] buf; byte[] buf;
int readBytes; int readBytes;
try { try {
int bytesToRead = in.available(); int bytesToRead = in.available();
if (bytesToRead > 0) { if (bytesToRead > 0) {
buf = new byte[bytesToRead]; buf = new byte[bytesToRead];
readBytes = in.read(buf); readBytes = in.read(buf);
} else { } else {
// peek into the stream if it was closed (value=-1) // peek into the stream if it was closed (value=-1)
int b = in.read(); int b = in.read();
if (b < 0) { if (b < 0) {
break; break;
} }
// push back the byte which was read too much // push back the byte which was read too much
in.unread(b); in.unread(b);
continue; continue;
} }
} catch (Throwable t) { } catch (Throwable t) {
if (!channelSink.channel.getCloseFuture().isDone()) { if (!channelSink.channel.getCloseFuture().isDone()) {
fireExceptionCaught(channelSink.channel, t); fireExceptionCaught(channelSink.channel, t);
} }
break; break;
} }
fireMessageReceived(channelSink.channel, ChannelBuffers.wrappedBuffer(buf, 0, readBytes)); fireMessageReceived(channelSink.channel, ChannelBuffers.wrappedBuffer(buf, 0, readBytes));
} }
// Clean up. // Clean up.
close(channelSink.channel); close(channelSink.channel);
} }
} }
private final ExecutorService executorService; private final ExecutorService executorService;
private IOStreamChannel channel; private IOStreamChannel channel;
public IOStreamChannelSink(final ExecutorService executorService) { public IOStreamChannelSink(final ExecutorService executorService) {
this.executorService = executorService; this.executorService = executorService;
} }
public boolean isConnected() { public boolean isConnected() {
return inputStream != null && outputStream != null; return inputStream != null && outputStream != null;
} }
public IOStreamAddress getRemoteAddress() { public IOStreamAddress getRemoteAddress() {
return remoteAddress; return remoteAddress;
} }
public boolean isBound() { public boolean isBound() {
return false; return false;
} }
public ChannelConfig getConfig() { public ChannelConfig getConfig() {
return config; return config;
} }
public void setChannel(final IOStreamChannel channel) { public void setChannel(final IOStreamChannel channel) {
this.channel = channel; this.channel = channel;
} }
private IOStreamAddress remoteAddress; private IOStreamAddress remoteAddress;
private OutputStream outputStream; private OutputStream outputStream;
private PushbackInputStream inputStream; private PushbackInputStream inputStream;
private ChannelConfig config = new DefaultChannelConfig(); private ChannelConfig config = new DefaultChannelConfig();
@Override @Override
public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
final ChannelFuture future = e.getFuture(); final ChannelFuture future = e.getFuture();
if (e instanceof ChannelStateEvent) { if (e instanceof ChannelStateEvent) {
final ChannelStateEvent stateEvent = (ChannelStateEvent) e; final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
final ChannelState state = stateEvent.getState(); final ChannelState state = stateEvent.getState();
final Object value = stateEvent.getValue(); final Object value = stateEvent.getValue();
switch (state) { switch (state) {
case OPEN: case OPEN:
if (Boolean.FALSE.equals(value)) { if (Boolean.FALSE.equals(value)) {
outputStream = null; outputStream = null;
inputStream = null; inputStream = null;
((IOStreamChannel) e.getChannel()).doSetClosed(); ((IOStreamChannel) e.getChannel()).doSetClosed();
} }
break; break;
case BOUND: case BOUND:
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
case CONNECTED: case CONNECTED:
if (value != null) { if (value != null) {
remoteAddress = (IOStreamAddress) value; remoteAddress = (IOStreamAddress) value;
outputStream = remoteAddress.getOutputStream(); outputStream = remoteAddress.getOutputStream();
inputStream = new PushbackInputStream(remoteAddress.getInputStream()); inputStream = new PushbackInputStream(remoteAddress.getInputStream());
executorService.execute(new ReadRunnable(this)); executorService.execute(new ReadRunnable(this));
future.setSuccess(); future.setSuccess();
} }
break; break;
case INTEREST_OPS: case INTEREST_OPS:
// TODO implement // TODO implement
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
} else if (e instanceof MessageEvent) { } else if (e instanceof MessageEvent) {
final MessageEvent event = (MessageEvent) e; final MessageEvent event = (MessageEvent) e;
if (event.getMessage() instanceof ChannelBuffer) { if (event.getMessage() instanceof ChannelBuffer) {
final ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); final ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
buffer.readBytes(outputStream, buffer.readableBytes()); buffer.readBytes(outputStream, buffer.readableBytes());
outputStream.flush(); outputStream.flush();
future.setSuccess(); future.setSuccess();
} else { } else {
throw new IllegalArgumentException( throw new IllegalArgumentException("Only ChannelBuffer objects are supported to be written onto the IOStreamChannelSink! " + "Please check if the encoder pipeline is configured correctly.");
"Only ChannelBuffer objects are supported to be written onto the IOStreamChannelSink! " }
+ "Please check if the encoder pipeline is configured correctly." }
); }
}
}
}
} }