[#962] Read data as soon as it is present in OIO and not wait till it match Buffer.writableBytes()

- Also add a new abstract class called StreamOioByteChannel which can be used by OIO channel implementation which are Stream based as a starting point.
This commit is contained in:
Norman Maurer 2013-01-21 10:14:21 +01:00
parent b20e597217
commit 082b5f0dff
4 changed files with 191 additions and 160 deletions

View File

@ -18,35 +18,24 @@ package io.netty.channel.rxtx;
import gnu.io.CommPort;
import gnu.io.CommPortIdentifier;
import gnu.io.SerialPort;
import io.netty.buffer.BufType;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.socket.oio.AbstractOioByteChannel;
import io.netty.channel.socket.oio.StreamOioByteChannel;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.NotYetConnectedException;
import static io.netty.channel.rxtx.RxtxChannelOption.*;
/**
* A channel to a serial device using the RXTX library.
*/
public class RxtxChannel extends AbstractOioByteChannel {
public class RxtxChannel extends StreamOioByteChannel {
private static final RxtxDeviceAddress LOCAL_ADDRESS = new RxtxDeviceAddress("localhost");
private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.BYTE, true);
private final RxtxChannelConfig config;
private boolean open = true;
private RxtxDeviceAddress deviceAddress;
private SerialPort serialPort;
private InputStream in;
private OutputStream out;
public RxtxChannel() {
super(null, null);
@ -59,47 +48,11 @@ public class RxtxChannel extends AbstractOioByteChannel {
return config;
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public boolean isOpen() {
return open;
}
@Override
public boolean isActive() {
return in != null && out != null;
}
@Override
protected int available() {
try {
return in.available();
} catch (IOException e) {
return 0;
}
}
@Override
protected int doReadBytes(ByteBuf buf) throws Exception {
try {
return buf.writeBytes(in, buf.writableBytes());
} catch (SocketTimeoutException e) {
return 0;
}
}
@Override
protected void doWriteBytes(ByteBuf buf) throws Exception {
if (out == null) {
throw new NotYetConnectedException();
}
buf.readBytes(out, buf.readableBytes());
}
@Override
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
RxtxDeviceAddress remote = (RxtxDeviceAddress) remoteAddress;
@ -118,8 +71,7 @@ public class RxtxChannel extends AbstractOioByteChannel {
serialPort.setDTR(config().getOption(DTR));
serialPort.setRTS(config().getOption(RTS));
out = serialPort.getOutputStream();
in = serialPort.getInputStream();
activate(serialPort.getInputStream(), serialPort.getOutputStream());
}
@Override
@ -155,36 +107,14 @@ public class RxtxChannel extends AbstractOioByteChannel {
@Override
protected void doClose() throws Exception {
open = false;
IOException ex = null;
try {
if (in != null) {
in.close();
super.doClose();
} finally {
if (serialPort != null) {
serialPort.removeEventListener();
serialPort.close();
serialPort = null;
}
} catch (IOException e) {
ex = e;
}
try {
if (out != null) {
out.close();
}
} catch (IOException e) {
ex = e;
}
if (serialPort != null) {
serialPort.removeEventListener();
serialPort.close();
}
in = null;
out = null;
serialPort = null;
if (ex != null) {
throw ex;
}
}
}

View File

@ -15,8 +15,10 @@
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.BufType;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ChannelInputShutdownEvent;
@ -29,6 +31,7 @@ import java.io.IOException;
public abstract class AbstractOioByteChannel extends AbstractOioChannel {
private volatile boolean inputShutdown;
private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.BYTE, false);
/**
* @see AbstractOioByteChannel#AbstractOioByteChannel(Channel, Integer)
@ -41,6 +44,11 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
return inputShutdown;
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
protected void doRead() {
if (inputShutdown) {

View File

@ -15,15 +15,12 @@
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.BufType;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
@ -32,32 +29,22 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.Channels;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.WritableByteChannel;
/**
* A {@link SocketChannel} which is using Old-Blocking-IO
*/
public class OioSocketChannel extends AbstractOioByteChannel
public class OioSocketChannel extends StreamOioByteChannel
implements SocketChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioSocketChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.BYTE, false);
private final Socket socket;
private final SocketChannelConfig config;
private InputStream is;
private OutputStream os;
private WritableByteChannel outChannel;
/**
* Create a new instance with an new {@link Socket}
@ -91,8 +78,7 @@ public class OioSocketChannel extends AbstractOioByteChannel
boolean success = false;
try {
if (socket.isConnected()) {
is = socket.getInputStream();
os = socket.getOutputStream();
activate(socket.getInputStream(), socket.getOutputStream());
}
socket.setSoTimeout(SO_TIMEOUT);
success = true;
@ -114,11 +100,6 @@ public class OioSocketChannel extends AbstractOioByteChannel
return (ServerSocketChannel) super.parent();
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public SocketChannelConfig config() {
return config;
@ -149,6 +130,18 @@ public class OioSocketChannel extends AbstractOioByteChannel
return shutdownOutput(newPromise());
}
@Override
protected int doReadBytes(ByteBuf buf) throws Exception {
if (socket.isClosed()) {
return -1;
}
try {
return super.doReadBytes(buf);
} catch (SocketTimeoutException e) {
return 0;
}
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise future) {
EventLoop loop = eventLoop();
@ -205,8 +198,7 @@ public class OioSocketChannel extends AbstractOioByteChannel
boolean success = false;
try {
socket.connect(remoteAddress, config().getConnectTimeoutMillis());
is = socket.getInputStream();
os = socket.getOutputStream();
activate(socket.getInputStream(), socket.getOutputStream());
success = true;
} finally {
if (!success) {
@ -224,62 +216,4 @@ public class OioSocketChannel extends AbstractOioByteChannel
protected void doClose() throws Exception {
socket.close();
}
@Override
protected int available() {
try {
return is.available();
} catch (IOException e) {
return 0;
}
}
@Override
protected int doReadBytes(ByteBuf buf) throws Exception {
if (socket.isClosed()) {
return -1;
}
try {
return buf.writeBytes(is, buf.writableBytes());
} catch (SocketTimeoutException e) {
return 0;
}
}
@Override
protected void doWriteBytes(ByteBuf buf) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
}
buf.readBytes(os, buf.readableBytes());
}
@Override
protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
}
if (outChannel == null) {
outChannel = Channels.newChannel(os);
}
long written = 0;
for (;;) {
long localWritten = region.transferTo(outChannel, written);
if (localWritten == -1) {
checkEOF(region, written);
region.close();
promise.setSuccess();
return;
}
written += localWritten;
if (written >= region.count()) {
promise.setSuccess();
return;
}
}
}
}

View File

@ -0,0 +1,159 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.WritableByteChannel;
/**
* Abstract base class for OIO Channels that are based on streams.
*/
public abstract class StreamOioByteChannel extends AbstractOioByteChannel {
private InputStream is;
private OutputStream os;
private WritableByteChannel outChannel;
/**
* Create a new instance
*
* @param parent the parent {@link Channel} which was used to create this instance. This can be null if the
* {@link} has no parent as it was created by your self.
* @param id the id which should be used for this instance or {@code null} if a new one should be generated
*/
protected StreamOioByteChannel(Channel parent, Integer id) {
super(parent, id);
}
/**
* Activate this instance. After this call {@link #isActive()} will return {@code true}.
*/
protected final void activate(InputStream is, OutputStream os) {
if (this.is != null) {
throw new IllegalStateException("input was set already");
}
if (this.os != null) {
throw new IllegalStateException("output was set already");
}
if (is == null) {
throw new NullPointerException("is");
}
if (os == null) {
throw new NullPointerException("os");
}
this.is = is;
this.os = os;
}
@Override
public boolean isActive() {
return is != null && os != null;
}
@Override
protected int available() {
try {
return is.available();
} catch (IOException e) {
return 0;
}
}
@Override
protected int doReadBytes(ByteBuf buf) throws Exception {
int length = available();
if (length < 1) {
length = 1;
}
if (length > buf.writableBytes()) {
length = buf.writableBytes();
}
return buf.writeBytes(is, length);
}
@Override
protected void doWriteBytes(ByteBuf buf) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
}
buf.readBytes(os, buf.readableBytes());
}
@Override
protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
}
if (outChannel == null) {
outChannel = Channels.newChannel(os);
}
long written = 0;
for (;;) {
long localWritten = region.transferTo(outChannel, written);
if (localWritten == -1) {
checkEOF(region, written);
region.close();
promise.setSuccess();
return;
}
written += localWritten;
if (written >= region.count()) {
promise.setSuccess();
return;
}
}
}
@Override
protected void doClose() throws Exception {
IOException ex = null;
try {
if (is != null) {
is.close();
}
} catch (IOException e) {
ex = e;
}
try {
if (os != null) {
os.close();
}
} catch (IOException e) {
ex = e;
}
is = null;
os = null;
if (ex != null) {
throw ex;
}
}
}