Fixed NETTY-21 and NETTY-22:

* ChannelBuffer.setBytes() and writeBytes() doesn't work when a NIO channel is specified.
* ChannelBuffer.setBytes() and writeBytes() should return -1 if the connection is closed.
* Changed NioWorker.read() to use ChannelBuffer.writeBytes()
This commit is contained in:
Trustin Lee 2008-08-20 03:09:23 +00:00
parent edd969b384
commit 66f7d5a800
10 changed files with 104 additions and 52 deletions

View File

@ -22,7 +22,6 @@
*/
package org.jboss.netty.buffer;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -201,39 +200,69 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
return out.write((ByteBuffer) buffer.duplicate().position(index).limit(index + length));
}
public void setBytes(int index, InputStream in, int length)
public int setBytes(int index, InputStream in, int length)
throws IOException {
if (length == 0) {
return;
return 0;
}
int readBytes = 0;
if (!buffer.isReadOnly() && buffer.hasArray()) {
index += buffer.arrayOffset();
do {
int readBytes = in.read(
buffer.array(), index, length);
if (readBytes < 0) {
throw new EOFException();
int localReadBytes = in.read(buffer.array(), index, length);
if (localReadBytes < 0) {
if (readBytes == 0) {
return -1;
} else {
break;
}
}
index += readBytes;
length -= readBytes;
readBytes += localReadBytes;
index += localReadBytes;
length -= localReadBytes;
} while (length > 0);
} else {
byte[] tmp = new byte[length];
for (int i = 0; i < tmp.length;) {
int readBytes = in.read(tmp, i, tmp.length - i);
if (readBytes < 0) {
throw new EOFException();
int localReadBytes = in.read(tmp, i, tmp.length - i);
if (localReadBytes < 0) {
if (readBytes == 0) {
return -1;
} else {
break;
}
}
i += readBytes;
}
((ByteBuffer) buffer.duplicate().position(index)).get(tmp);
}
return readBytes;
}
public int setBytes(int index, ScatteringByteChannel in, int length)
throws IOException {
return in.read((ByteBuffer) buffer.duplicate().limit(index + length).position(index));
ByteBuffer slice = (ByteBuffer) buffer.duplicate().limit(index + length).position(index);
int readBytes = 0;
while (readBytes < length) {
int localReadBytes = in.read(slice);
if (localReadBytes < 0) {
if (readBytes == 0) {
return -1;
} else {
return readBytes;
}
} else if (localReadBytes == 0) {
break;
}
readBytes += localReadBytes;
}
return readBytes;
}
public ByteBuffer toByteBuffer(int index, int length) {

View File

@ -769,13 +769,16 @@ public interface ChannelBuffer extends Comparable<ChannelBuffer> {
*
* @param length the number of bytes to transfer
*
* @return the actual number of bytes read in from the specified channel.
* {@code -1} if the specified channel is closed.
*
* @throws IndexOutOfBoundsException
* if the specified {@code index} is less than {@code 0} or
* if {@code index + length} is greater than {@code this.capacity}
* @throws IOException
* if the specified stream threw an exception during I/O
*/
void setBytes(int index, InputStream in, int length) throws IOException;
int setBytes(int index, InputStream in, int length) throws IOException;
/**
* Transfers the content of the specified source channel to this buffer
@ -783,7 +786,8 @@ public interface ChannelBuffer extends Comparable<ChannelBuffer> {
*
* @param length the maximum number of bytes to transfer
*
* @return the actual number of bytes read in from the specified channel
* @return the actual number of bytes read in from the specified channel.
* {@code -1} if the specified channel is closed.
*
* @throws IndexOutOfBoundsException
* if the specified {@code index} is less than {@code 0} or

View File

@ -343,7 +343,7 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
}
}
public void setBytes(int index, InputStream in, int length)
public int setBytes(int index, InputStream in, int length)
throws IOException {
int sliceId = sliceId(index);
if (index + length >= capacity()) {
@ -351,15 +351,26 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer {
}
int i = sliceId;
int readBytes = 0;
while (length > 0) {
ChannelBuffer s = slices[i];
int adjustment = indices[i];
int localLength = Math.min(length, s.capacity() - (index - adjustment));
s.setBytes(index - adjustment, in, localLength);
int localReadBytes = s.setBytes(index - adjustment, in, localLength);
if (localReadBytes < 0) {
if (readBytes == 0) {
return -1;
} else {
break;
}
}
index += localLength;
length -= localLength;
i ++;
}
return readBytes;
}
public int setBytes(int index, ScatteringByteChannel in, int length)

View File

@ -156,9 +156,9 @@ public class DuplicatedChannelBuffer extends AbstractChannelBuffer implements Wr
return buffer.getBytes(index, out, length);
}
public void setBytes(int index, InputStream in, int length)
public int setBytes(int index, InputStream in, int length)
throws IOException {
buffer.setBytes(index, in, length);
return buffer.setBytes(index, in, length);
}
public int setBytes(int index, ScatteringByteChannel in, int length)

View File

@ -144,9 +144,9 @@ public class DynamicChannelBuffer extends AbstractChannelBuffer {
buffer.setBytes(index, src);
}
public void setBytes(int index, InputStream in, int length)
public int setBytes(int index, InputStream in, int length)
throws IOException {
buffer.setBytes(index, in, length);
return buffer.setBytes(index, in, length);
}
public int setBytes(int index, ScatteringByteChannel in, int length)

View File

@ -22,7 +22,6 @@
*/
package org.jboss.netty.buffer;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -114,29 +113,44 @@ public abstract class HeapChannelBuffer extends AbstractChannelBuffer {
src.get(array, index, src.remaining());
}
public void setBytes(int index, InputStream in, int length) throws IOException {
public int setBytes(int index, InputStream in, int length) throws IOException {
int readBytes = 0;
while (length > 0) {
int readBytes = in.read(array, index, length);
if (readBytes < 0) {
throw new EOFException();
int localReadBytes = in.read(array, index, length);
if (localReadBytes < 0) {
if (readBytes == 0) {
return -1;
} else {
break;
}
}
index += readBytes;
length -= readBytes;
readBytes += localReadBytes;
index += localReadBytes;
length -= localReadBytes;
}
return readBytes;
}
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
ByteBuffer buf = ByteBuffer.wrap(array, index, length);
while (length > 0) {
int readBytes = in.read(buf);
if (readBytes < 0) {
throw new EOFException();
} else if (readBytes == 0) {
int readBytes = 0;
while (readBytes < length) {
int localReadBytes = in.read(buf);
if (localReadBytes < 0) {
if (readBytes == 0) {
return -1;
} else {
break;
}
} else if (localReadBytes == 0) {
break;
}
readBytes += localReadBytes;
}
return buf.flip().remaining();
return readBytes;
}
public ChannelBuffer slice(int index, int length) {

View File

@ -101,9 +101,10 @@ public class ReadOnlyChannelBuffer extends AbstractChannelBuffer implements Wrap
rejectModification();
}
public void setBytes(int index, InputStream in, int length)
public int setBytes(int index, InputStream in, int length)
throws IOException {
rejectModification();
return 0;
}
public int setBytes(int index, ScatteringByteChannel in, int length)

View File

@ -178,10 +178,10 @@ public class SlicedChannelBuffer extends AbstractChannelBuffer implements Wrappe
return buffer.getBytes(index + adjustment, out, length);
}
public void setBytes(int index, InputStream in, int length)
public int setBytes(int index, InputStream in, int length)
throws IOException {
checkIndex(index, length);
buffer.setBytes(index + adjustment, in, length);
return buffer.setBytes(index + adjustment, in, length);
}
public int setBytes(int index, ScatteringByteChannel in, int length)

View File

@ -172,10 +172,10 @@ public class TruncatedChannelBuffer extends AbstractChannelBuffer implements Wra
return buffer.getBytes(index, out, length);
}
public void setBytes(int index, InputStream in, int length)
public int setBytes(int index, InputStream in, int length)
throws IOException {
checkIndex(index, length);
buffer.setBytes(index, in, length);
return buffer.setBytes(index, in, length);
}
public int setBytes(int index, ScatteringByteChannel in, int length)

View File

@ -25,9 +25,8 @@ package org.jboss.netty.channel.socket.nio;
import static org.jboss.netty.channel.Channels.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
@ -219,20 +218,20 @@ class NioWorker implements Runnable {
}
private static void read(SelectionKey k) {
ReadableByteChannel ch = (ReadableByteChannel) k.channel();
ScatteringByteChannel ch = (ScatteringByteChannel) k.channel();
NioSocketChannel channel = (NioSocketChannel) k.attachment();
ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
ByteBuffer buf = ByteBuffer.allocate(predictor.nextReceiveBufferSize());
ChannelBuffer buf = ChannelBuffers.buffer(predictor.nextReceiveBufferSize());
int ret = 0;
int readBytes = 0;
boolean failure = true;
try {
while ((ret = ch.read(buf)) > 0) {
while ((ret = buf.writeBytes(ch, buf.writableBytes())) > 0) {
readBytes += ret;
if (!buf.hasRemaining()) {
if (!buf.writable()) {
break;
}
}
@ -246,13 +245,7 @@ class NioWorker implements Runnable {
predictor.previousReceiveBufferSize(readBytes);
// Fire the event.
ChannelBuffer buffer;
if (readBytes == buf.capacity()) {
buffer = ChannelBuffers.wrappedBuffer(buf.array());
} else {
buffer = ChannelBuffers.wrappedBuffer(buf.array(), 0, readBytes);
}
fireMessageReceived(channel, buffer);
fireMessageReceived(channel, buf);
}
if (ret < 0 || failure) {