diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java index 6f0c7b1117..c9e5835773 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java @@ -18,35 +18,24 @@ package io.netty.channel.rxtx; import gnu.io.CommPort; import gnu.io.CommPortIdentifier; import gnu.io.SerialPort; -import io.netty.buffer.BufType; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelMetadata; -import io.netty.channel.socket.oio.AbstractOioByteChannel; +import io.netty.channel.socket.oio.StreamOioByteChannel; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.SocketAddress; -import java.net.SocketTimeoutException; -import java.nio.channels.NotYetConnectedException; import static io.netty.channel.rxtx.RxtxChannelOption.*; /** * A channel to a serial device using the RXTX library. */ -public class RxtxChannel extends AbstractOioByteChannel { +public class RxtxChannel extends StreamOioByteChannel { private static final RxtxDeviceAddress LOCAL_ADDRESS = new RxtxDeviceAddress("localhost"); - private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.BYTE, true); private final RxtxChannelConfig config; private boolean open = true; private RxtxDeviceAddress deviceAddress; private SerialPort serialPort; - private InputStream in; - private OutputStream out; public RxtxChannel() { super(null, null); @@ -59,47 +48,11 @@ public class RxtxChannel extends AbstractOioByteChannel { return config; } - @Override - public ChannelMetadata metadata() { - return METADATA; - } - @Override public boolean isOpen() { return open; } - @Override - public boolean isActive() { - return in != null && out != null; - } - - @Override - protected int available() { - try { - return in.available(); - } catch (IOException e) { - return 0; - } - } - - @Override - protected int doReadBytes(ByteBuf buf) throws Exception { - try { - return buf.writeBytes(in, buf.writableBytes()); - } catch (SocketTimeoutException e) { - return 0; - } - } - - @Override - protected void doWriteBytes(ByteBuf buf) throws Exception { - if (out == null) { - throw new NotYetConnectedException(); - } - buf.readBytes(out, buf.readableBytes()); - } - @Override protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { RxtxDeviceAddress remote = (RxtxDeviceAddress) remoteAddress; @@ -118,8 +71,7 @@ public class RxtxChannel extends AbstractOioByteChannel { serialPort.setDTR(config().getOption(DTR)); serialPort.setRTS(config().getOption(RTS)); - out = serialPort.getOutputStream(); - in = serialPort.getInputStream(); + activate(serialPort.getInputStream(), serialPort.getOutputStream()); } @Override @@ -155,36 +107,14 @@ public class RxtxChannel extends AbstractOioByteChannel { @Override protected void doClose() throws Exception { open = false; - - IOException ex = null; - try { - if (in != null) { - in.close(); + super.doClose(); + } finally { + if (serialPort != null) { + serialPort.removeEventListener(); + serialPort.close(); + serialPort = null; } - } catch (IOException e) { - ex = e; - } - - try { - if (out != null) { - out.close(); - } - } catch (IOException e) { - ex = e; - } - - if (serialPort != null) { - serialPort.removeEventListener(); - serialPort.close(); - } - - in = null; - out = null; - serialPort = null; - - if (ex != null) { - throw ex; } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java index fd3e2632a2..ec24046838 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java @@ -15,8 +15,10 @@ */ package io.netty.channel.socket.oio; +import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.ChannelInputShutdownEvent; @@ -29,6 +31,7 @@ import java.io.IOException; public abstract class AbstractOioByteChannel extends AbstractOioChannel { private volatile boolean inputShutdown; + private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.BYTE, false); /** * @see AbstractOioByteChannel#AbstractOioByteChannel(Channel, Integer) @@ -41,6 +44,11 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { return inputShutdown; } + @Override + public ChannelMetadata metadata() { + return METADATA; + } + @Override protected void doRead() { if (inputShutdown) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index 38fc1a52ab..8cc924bda9 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -15,15 +15,12 @@ */ package io.netty.channel.socket.oio; -import io.netty.buffer.BufType; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; -import io.netty.channel.FileRegion; import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; @@ -32,32 +29,22 @@ import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.net.SocketTimeoutException; -import java.nio.channels.Channels; -import java.nio.channels.NotYetConnectedException; -import java.nio.channels.WritableByteChannel; /** * A {@link SocketChannel} which is using Old-Blocking-IO */ -public class OioSocketChannel extends AbstractOioByteChannel +public class OioSocketChannel extends StreamOioByteChannel implements SocketChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioSocketChannel.class); - private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.BYTE, false); - private final Socket socket; private final SocketChannelConfig config; - private InputStream is; - private OutputStream os; - private WritableByteChannel outChannel; /** * Create a new instance with an new {@link Socket} @@ -91,8 +78,7 @@ public class OioSocketChannel extends AbstractOioByteChannel boolean success = false; try { if (socket.isConnected()) { - is = socket.getInputStream(); - os = socket.getOutputStream(); + activate(socket.getInputStream(), socket.getOutputStream()); } socket.setSoTimeout(SO_TIMEOUT); success = true; @@ -114,11 +100,6 @@ public class OioSocketChannel extends AbstractOioByteChannel return (ServerSocketChannel) super.parent(); } - @Override - public ChannelMetadata metadata() { - return METADATA; - } - @Override public SocketChannelConfig config() { return config; @@ -149,6 +130,18 @@ public class OioSocketChannel extends AbstractOioByteChannel return shutdownOutput(newPromise()); } + @Override + protected int doReadBytes(ByteBuf buf) throws Exception { + if (socket.isClosed()) { + return -1; + } + try { + return super.doReadBytes(buf); + } catch (SocketTimeoutException e) { + return 0; + } + } + @Override public ChannelFuture shutdownOutput(final ChannelPromise future) { EventLoop loop = eventLoop(); @@ -205,8 +198,7 @@ public class OioSocketChannel extends AbstractOioByteChannel boolean success = false; try { socket.connect(remoteAddress, config().getConnectTimeoutMillis()); - is = socket.getInputStream(); - os = socket.getOutputStream(); + activate(socket.getInputStream(), socket.getOutputStream()); success = true; } finally { if (!success) { @@ -224,62 +216,4 @@ public class OioSocketChannel extends AbstractOioByteChannel protected void doClose() throws Exception { socket.close(); } - - @Override - protected int available() { - try { - return is.available(); - } catch (IOException e) { - return 0; - } - } - - @Override - protected int doReadBytes(ByteBuf buf) throws Exception { - if (socket.isClosed()) { - return -1; - } - - try { - return buf.writeBytes(is, buf.writableBytes()); - } catch (SocketTimeoutException e) { - return 0; - } - } - - @Override - protected void doWriteBytes(ByteBuf buf) throws Exception { - OutputStream os = this.os; - if (os == null) { - throw new NotYetConnectedException(); - } - buf.readBytes(os, buf.readableBytes()); - } - - @Override - protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception { - OutputStream os = this.os; - if (os == null) { - throw new NotYetConnectedException(); - } - if (outChannel == null) { - outChannel = Channels.newChannel(os); - } - long written = 0; - - for (;;) { - long localWritten = region.transferTo(outChannel, written); - if (localWritten == -1) { - checkEOF(region, written); - region.close(); - promise.setSuccess(); - return; - } - written += localWritten; - if (written >= region.count()) { - promise.setSuccess(); - return; - } - } - } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/StreamOioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/StreamOioByteChannel.java new file mode 100644 index 0000000000..7d7730dc44 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/oio/StreamOioByteChannel.java @@ -0,0 +1,159 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.oio; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelPromise; +import io.netty.channel.FileRegion; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.NotYetConnectedException; +import java.nio.channels.WritableByteChannel; + +/** + * Abstract base class for OIO Channels that are based on streams. + */ +public abstract class StreamOioByteChannel extends AbstractOioByteChannel { + + private InputStream is; + private OutputStream os; + private WritableByteChannel outChannel; + + /** + * Create a new instance + * + * @param parent the parent {@link Channel} which was used to create this instance. This can be null if the + * {@link} has no parent as it was created by your self. + * @param id the id which should be used for this instance or {@code null} if a new one should be generated + */ + protected StreamOioByteChannel(Channel parent, Integer id) { + super(parent, id); + } + + /** + * Activate this instance. After this call {@link #isActive()} will return {@code true}. + */ + protected final void activate(InputStream is, OutputStream os) { + if (this.is != null) { + throw new IllegalStateException("input was set already"); + } + if (this.os != null) { + throw new IllegalStateException("output was set already"); + } + if (is == null) { + throw new NullPointerException("is"); + } + if (os == null) { + throw new NullPointerException("os"); + } + this.is = is; + this.os = os; + } + + @Override + public boolean isActive() { + return is != null && os != null; + } + + @Override + protected int available() { + try { + return is.available(); + } catch (IOException e) { + return 0; + } + } + + @Override + protected int doReadBytes(ByteBuf buf) throws Exception { + int length = available(); + if (length < 1) { + length = 1; + } + if (length > buf.writableBytes()) { + length = buf.writableBytes(); + } + return buf.writeBytes(is, length); + } + + @Override + protected void doWriteBytes(ByteBuf buf) throws Exception { + OutputStream os = this.os; + if (os == null) { + throw new NotYetConnectedException(); + } + buf.readBytes(os, buf.readableBytes()); + } + + @Override + protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception { + OutputStream os = this.os; + if (os == null) { + throw new NotYetConnectedException(); + } + if (outChannel == null) { + outChannel = Channels.newChannel(os); + } + long written = 0; + + for (;;) { + long localWritten = region.transferTo(outChannel, written); + if (localWritten == -1) { + checkEOF(region, written); + region.close(); + promise.setSuccess(); + return; + } + written += localWritten; + if (written >= region.count()) { + promise.setSuccess(); + return; + } + } + } + + @Override + protected void doClose() throws Exception { + IOException ex = null; + + try { + if (is != null) { + is.close(); + } + } catch (IOException e) { + ex = e; + } + + try { + if (os != null) { + os.close(); + } + } catch (IOException e) { + ex = e; + } + + is = null; + os = null; + + if (ex != null) { + throw ex; + } + } +}