From 66f7d5a800517dcb788c736c2ef482e669c7de15 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 20 Aug 2008 03:09:23 +0000 Subject: [PATCH] Fixed NETTY-21 and NETTY-22: * ChannelBuffer.setBytes() and writeBytes() doesn't work when a NIO channel is specified. * ChannelBuffer.setBytes() and writeBytes() should return -1 if the connection is closed. * Changed NioWorker.read() to use ChannelBuffer.writeBytes() --- .../buffer/ByteBufferBackedChannelBuffer.java | 55 ++++++++++++++----- .../org/jboss/netty/buffer/ChannelBuffer.java | 8 ++- .../netty/buffer/CompositeChannelBuffer.java | 15 ++++- .../netty/buffer/DuplicatedChannelBuffer.java | 4 +- .../netty/buffer/DynamicChannelBuffer.java | 4 +- .../jboss/netty/buffer/HeapChannelBuffer.java | 40 +++++++++----- .../netty/buffer/ReadOnlyChannelBuffer.java | 3 +- .../netty/buffer/SlicedChannelBuffer.java | 4 +- .../netty/buffer/TruncatedChannelBuffer.java | 4 +- .../netty/channel/socket/nio/NioWorker.java | 19 ++----- 10 files changed, 104 insertions(+), 52 deletions(-) diff --git a/src/main/java/org/jboss/netty/buffer/ByteBufferBackedChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/ByteBufferBackedChannelBuffer.java index 22da1b5c41..aacd22673a 100644 --- a/src/main/java/org/jboss/netty/buffer/ByteBufferBackedChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/ByteBufferBackedChannelBuffer.java @@ -22,7 +22,6 @@ */ package org.jboss.netty.buffer; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -201,39 +200,69 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer { return out.write((ByteBuffer) buffer.duplicate().position(index).limit(index + length)); } - public void setBytes(int index, InputStream in, int length) + public int setBytes(int index, InputStream in, int length) throws IOException { if (length == 0) { - return; + return 0; } + int readBytes = 0; + if (!buffer.isReadOnly() && buffer.hasArray()) { index += buffer.arrayOffset(); do { - int readBytes = in.read( - buffer.array(), index, length); - if (readBytes < 0) { - throw new EOFException(); + int localReadBytes = in.read(buffer.array(), index, length); + if (localReadBytes < 0) { + if (readBytes == 0) { + return -1; + } else { + break; + } } - index += readBytes; - length -= readBytes; + readBytes += localReadBytes; + index += localReadBytes; + length -= localReadBytes; } while (length > 0); } else { byte[] tmp = new byte[length]; for (int i = 0; i < tmp.length;) { - int readBytes = in.read(tmp, i, tmp.length - i); - if (readBytes < 0) { - throw new EOFException(); + int localReadBytes = in.read(tmp, i, tmp.length - i); + if (localReadBytes < 0) { + if (readBytes == 0) { + return -1; + } else { + break; + } } i += readBytes; } ((ByteBuffer) buffer.duplicate().position(index)).get(tmp); } + + return readBytes; } public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { - return in.read((ByteBuffer) buffer.duplicate().limit(index + length).position(index)); + + ByteBuffer slice = (ByteBuffer) buffer.duplicate().limit(index + length).position(index); + int readBytes = 0; + + while (readBytes < length) { + int localReadBytes = in.read(slice); + if (localReadBytes < 0) { + if (readBytes == 0) { + return -1; + } else { + return readBytes; + } + } else if (localReadBytes == 0) { + break; + } + readBytes += localReadBytes; + } + + return readBytes; } public ByteBuffer toByteBuffer(int index, int length) { diff --git a/src/main/java/org/jboss/netty/buffer/ChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/ChannelBuffer.java index daf9f5447e..3e6b575596 100644 --- a/src/main/java/org/jboss/netty/buffer/ChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/ChannelBuffer.java @@ -769,13 +769,16 @@ public interface ChannelBuffer extends Comparable { * * @param length the number of bytes to transfer * + * @return the actual number of bytes read in from the specified channel. + * {@code -1} if the specified channel is closed. + * * @throws IndexOutOfBoundsException * if the specified {@code index} is less than {@code 0} or * if {@code index + length} is greater than {@code this.capacity} * @throws IOException * if the specified stream threw an exception during I/O */ - void setBytes(int index, InputStream in, int length) throws IOException; + int setBytes(int index, InputStream in, int length) throws IOException; /** * Transfers the content of the specified source channel to this buffer @@ -783,7 +786,8 @@ public interface ChannelBuffer extends Comparable { * * @param length the maximum number of bytes to transfer * - * @return the actual number of bytes read in from the specified channel + * @return the actual number of bytes read in from the specified channel. + * {@code -1} if the specified channel is closed. * * @throws IndexOutOfBoundsException * if the specified {@code index} is less than {@code 0} or diff --git a/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java index a57b7e06aa..3d60983d65 100644 --- a/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/CompositeChannelBuffer.java @@ -343,7 +343,7 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer { } } - public void setBytes(int index, InputStream in, int length) + public int setBytes(int index, InputStream in, int length) throws IOException { int sliceId = sliceId(index); if (index + length >= capacity()) { @@ -351,15 +351,26 @@ public class CompositeChannelBuffer extends AbstractChannelBuffer { } int i = sliceId; + int readBytes = 0; + while (length > 0) { ChannelBuffer s = slices[i]; int adjustment = indices[i]; int localLength = Math.min(length, s.capacity() - (index - adjustment)); - s.setBytes(index - adjustment, in, localLength); + int localReadBytes = s.setBytes(index - adjustment, in, localLength); + if (localReadBytes < 0) { + if (readBytes == 0) { + return -1; + } else { + break; + } + } index += localLength; length -= localLength; i ++; } + + return readBytes; } public int setBytes(int index, ScatteringByteChannel in, int length) diff --git a/src/main/java/org/jboss/netty/buffer/DuplicatedChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/DuplicatedChannelBuffer.java index 0a7b9e8a2b..083c19aaf5 100644 --- a/src/main/java/org/jboss/netty/buffer/DuplicatedChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/DuplicatedChannelBuffer.java @@ -156,9 +156,9 @@ public class DuplicatedChannelBuffer extends AbstractChannelBuffer implements Wr return buffer.getBytes(index, out, length); } - public void setBytes(int index, InputStream in, int length) + public int setBytes(int index, InputStream in, int length) throws IOException { - buffer.setBytes(index, in, length); + return buffer.setBytes(index, in, length); } public int setBytes(int index, ScatteringByteChannel in, int length) diff --git a/src/main/java/org/jboss/netty/buffer/DynamicChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/DynamicChannelBuffer.java index ba95fa80bd..1e5b3c62a8 100644 --- a/src/main/java/org/jboss/netty/buffer/DynamicChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/DynamicChannelBuffer.java @@ -144,9 +144,9 @@ public class DynamicChannelBuffer extends AbstractChannelBuffer { buffer.setBytes(index, src); } - public void setBytes(int index, InputStream in, int length) + public int setBytes(int index, InputStream in, int length) throws IOException { - buffer.setBytes(index, in, length); + return buffer.setBytes(index, in, length); } public int setBytes(int index, ScatteringByteChannel in, int length) diff --git a/src/main/java/org/jboss/netty/buffer/HeapChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/HeapChannelBuffer.java index 8ca7e23a20..78a29cb738 100644 --- a/src/main/java/org/jboss/netty/buffer/HeapChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/HeapChannelBuffer.java @@ -22,7 +22,6 @@ */ package org.jboss.netty.buffer; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -114,29 +113,44 @@ public abstract class HeapChannelBuffer extends AbstractChannelBuffer { src.get(array, index, src.remaining()); } - public void setBytes(int index, InputStream in, int length) throws IOException { + public int setBytes(int index, InputStream in, int length) throws IOException { + int readBytes = 0; while (length > 0) { - int readBytes = in.read(array, index, length); - if (readBytes < 0) { - throw new EOFException(); + int localReadBytes = in.read(array, index, length); + if (localReadBytes < 0) { + if (readBytes == 0) { + return -1; + } else { + break; + } } - index += readBytes; - length -= readBytes; + readBytes += localReadBytes; + index += localReadBytes; + length -= localReadBytes; } + + return readBytes; } public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { ByteBuffer buf = ByteBuffer.wrap(array, index, length); - while (length > 0) { - int readBytes = in.read(buf); - if (readBytes < 0) { - throw new EOFException(); - } else if (readBytes == 0) { + int readBytes = 0; + + while (readBytes < length) { + int localReadBytes = in.read(buf); + if (localReadBytes < 0) { + if (readBytes == 0) { + return -1; + } else { + break; + } + } else if (localReadBytes == 0) { break; } + readBytes += localReadBytes; } - return buf.flip().remaining(); + return readBytes; } public ChannelBuffer slice(int index, int length) { diff --git a/src/main/java/org/jboss/netty/buffer/ReadOnlyChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/ReadOnlyChannelBuffer.java index a61af8074c..29f6cf5711 100644 --- a/src/main/java/org/jboss/netty/buffer/ReadOnlyChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/ReadOnlyChannelBuffer.java @@ -101,9 +101,10 @@ public class ReadOnlyChannelBuffer extends AbstractChannelBuffer implements Wrap rejectModification(); } - public void setBytes(int index, InputStream in, int length) + public int setBytes(int index, InputStream in, int length) throws IOException { rejectModification(); + return 0; } public int setBytes(int index, ScatteringByteChannel in, int length) diff --git a/src/main/java/org/jboss/netty/buffer/SlicedChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/SlicedChannelBuffer.java index b13740b3da..329b9087b6 100644 --- a/src/main/java/org/jboss/netty/buffer/SlicedChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/SlicedChannelBuffer.java @@ -178,10 +178,10 @@ public class SlicedChannelBuffer extends AbstractChannelBuffer implements Wrappe return buffer.getBytes(index + adjustment, out, length); } - public void setBytes(int index, InputStream in, int length) + public int setBytes(int index, InputStream in, int length) throws IOException { checkIndex(index, length); - buffer.setBytes(index + adjustment, in, length); + return buffer.setBytes(index + adjustment, in, length); } public int setBytes(int index, ScatteringByteChannel in, int length) diff --git a/src/main/java/org/jboss/netty/buffer/TruncatedChannelBuffer.java b/src/main/java/org/jboss/netty/buffer/TruncatedChannelBuffer.java index ce7779ae83..c703ee1816 100644 --- a/src/main/java/org/jboss/netty/buffer/TruncatedChannelBuffer.java +++ b/src/main/java/org/jboss/netty/buffer/TruncatedChannelBuffer.java @@ -172,10 +172,10 @@ public class TruncatedChannelBuffer extends AbstractChannelBuffer implements Wra return buffer.getBytes(index, out, length); } - public void setBytes(int index, InputStream in, int length) + public int setBytes(int index, InputStream in, int length) throws IOException { checkIndex(index, length); - buffer.setBytes(index, in, length); + return buffer.setBytes(index, in, length); } public int setBytes(int index, ScatteringByteChannel in, int length) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 5352ec6c49..52eb89361b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -25,9 +25,8 @@ package org.jboss.netty.channel.socket.nio; import static org.jboss.netty.channel.Channels.*; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.nio.channels.ReadableByteChannel; +import java.nio.channels.ScatteringByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; @@ -219,20 +218,20 @@ class NioWorker implements Runnable { } private static void read(SelectionKey k) { - ReadableByteChannel ch = (ReadableByteChannel) k.channel(); + ScatteringByteChannel ch = (ScatteringByteChannel) k.channel(); NioSocketChannel channel = (NioSocketChannel) k.attachment(); ReceiveBufferSizePredictor predictor = channel.getConfig().getReceiveBufferSizePredictor(); - ByteBuffer buf = ByteBuffer.allocate(predictor.nextReceiveBufferSize()); + ChannelBuffer buf = ChannelBuffers.buffer(predictor.nextReceiveBufferSize()); int ret = 0; int readBytes = 0; boolean failure = true; try { - while ((ret = ch.read(buf)) > 0) { + while ((ret = buf.writeBytes(ch, buf.writableBytes())) > 0) { readBytes += ret; - if (!buf.hasRemaining()) { + if (!buf.writable()) { break; } } @@ -246,13 +245,7 @@ class NioWorker implements Runnable { predictor.previousReceiveBufferSize(readBytes); // Fire the event. - ChannelBuffer buffer; - if (readBytes == buf.capacity()) { - buffer = ChannelBuffers.wrappedBuffer(buf.array()); - } else { - buffer = ChannelBuffers.wrappedBuffer(buf.array(), 0, readBytes); - } - fireMessageReceived(channel, buffer); + fireMessageReceived(channel, buf); } if (ret < 0 || failure) {