* Changed the type of WriteCompletionEvent.amount from int to long

* Added back FileRegion since mapped buffer does not outperform transferTo()
* Refactored SocketSendBufferPool to support FileRegion
This commit is contained in:
Trustin Lee 2010-02-23 07:18:58 +00:00
parent 1989bd037b
commit aa4b8a2a6c
11 changed files with 310 additions and 61 deletions

View File

@ -313,6 +313,20 @@ public class Channels {
return;
}
channel.getPipeline().sendUpstream(
new DefaultWriteCompletionEvent(channel, (long) amount));
}
/**
* Sends a {@code "writeComplete"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel}.
*/
public static void fireWriteComplete(Channel channel, long amount) {
if (amount == 0) {
return;
}
channel.getPipeline().sendUpstream(
new DefaultWriteCompletionEvent(channel, amount));
}
@ -324,9 +338,18 @@ public class Channels {
* {@link ChannelHandlerContext}.
*/
public static void fireWriteComplete(ChannelHandlerContext ctx, int amount) {
ctx.sendUpstream(new DefaultWriteCompletionEvent(ctx.getChannel(), amount));
ctx.sendUpstream(new DefaultWriteCompletionEvent(ctx.getChannel(), (long) amount));
}
/**
* Sends a {@code "writeComplete"} event to the
* {@link ChannelUpstreamHandler} which is placed in the closest upstream
* from the handler associated with the specified
* {@link ChannelHandlerContext}.
*/
public static void fireWriteComplete(ChannelHandlerContext ctx, long amount) {
ctx.sendUpstream(new DefaultWriteCompletionEvent(ctx.getChannel(), amount));
}
/**
* Sends a {@code "channelInterestChanged"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of

View File

@ -0,0 +1,43 @@
package org.jboss.netty.channel;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
public class DefaultFileRegion implements FileRegion {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultFileRegion.class);
private final FileChannel file;
private final long position;
private final long count;
public DefaultFileRegion(FileChannel file, long position, long count) {
this.file = file;
this.position = position;
this.count = count;
}
public long getPosition() {
return position;
}
public long getCount() {
return count;
}
public long transferTo(WritableByteChannel target) throws IOException {
return file.transferTo(position, count, target);
}
public void releaseExternalResources() {
try {
file.close();
} catch (IOException e) {
logger.warn("Failed to close a file.", e);
}
}
}

View File

@ -28,12 +28,20 @@ import static org.jboss.netty.channel.Channels.*;
public class DefaultWriteCompletionEvent implements WriteCompletionEvent {
private final Channel channel;
private final int writtenAmount;
private final long writtenAmount;
/**
* @deprecated Use {@link #DefaultWriteCompletionEvent(Channel, long)} instead.
*/
@Deprecated
public DefaultWriteCompletionEvent(Channel channel, int writtenAmount) {
this(channel, (long) writtenAmount);
}
/**
* Creates a new instance.
*/
public DefaultWriteCompletionEvent(Channel channel, int writtenAmount) {
public DefaultWriteCompletionEvent(Channel channel, long writtenAmount) {
if (channel == null) {
throw new NullPointerException("channel");
}
@ -54,7 +62,7 @@ public class DefaultWriteCompletionEvent implements WriteCompletionEvent {
return succeededFuture(getChannel());
}
public int getWrittenAmount() {
public long getWrittenAmount() {
return writtenAmount;
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2010 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import org.jboss.netty.util.ExternalResourceReleasable;
/**
* A region of a file that is sent via a {@link Channel} which supports
* zero-copy file transfer.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
*/
public interface FileRegion extends ExternalResourceReleasable {
/**
* Returns the offset in the file where the transfer began.
*/
long getPosition();
/**
* Returns the number of bytes to transfer.
*/
long getCount();
/**
* Transfers the content of this file region to the specified channel.
*/
long transferTo(WritableByteChannel target) throws IOException;
}

View File

@ -34,5 +34,5 @@ public interface WriteCompletionEvent extends ChannelEvent {
* @return the number of written bytes or messages, depending on the
* type of the transport
*/
int getWrittenAmount();
long getWrittenAmount();
}

View File

@ -261,7 +261,7 @@ class NioDatagramChannel extends AbstractChannel
boolean success = super.offer(e);
assert success;
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
@ -286,7 +286,7 @@ class NioDatagramChannel extends AbstractChannel
public MessageEvent poll() {
MessageEvent e = super.poll();
if (e != null) {
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
@ -303,6 +303,14 @@ class NioDatagramChannel extends AbstractChannel
}
return e;
}
private int getMessageSize(MessageEvent e) {
Object m = e.getMessage();
if (m instanceof ChannelBuffer) {
return ((ChannelBuffer) m).readableBytes();
}
return 0;
}
}
/**

View File

@ -36,7 +36,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
@ -500,7 +499,7 @@ class NioDatagramWorker implements Runnable {
boolean addOpWrite = false;
boolean removeOpWrite = false;
int writtenBytes = 0;
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final DatagramChannel ch = channel.getDatagramChannel();
@ -514,7 +513,6 @@ class NioDatagramWorker implements Runnable {
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf;
ByteBuffer bb;
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
@ -522,20 +520,17 @@ class NioDatagramWorker implements Runnable {
break;
}
ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage();
channel.currentWriteBuffer = buf = sendBufferPool.acquire(origBuf);
bb = buf.buffer;
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
buf = channel.currentWriteBuffer;
bb = buf.buffer;
}
try {
int localWrittenBytes = 0;
long localWrittenBytes = 0;
SocketAddress raddr = evt.getRemoteAddress();
if (raddr == null) {
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = ch.write(bb);
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
@ -543,7 +538,7 @@ class NioDatagramWorker implements Runnable {
}
} else {
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = ch.send(bb, raddr);
localWrittenBytes = buf.transferTo(ch, raddr);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
@ -559,7 +554,6 @@ class NioDatagramWorker implements Runnable {
channel.currentWriteBuffer = null;
evt = null;
buf = null;
bb = null;
future.setSuccess();
} else {
// Not written at all - perhaps the kernel buffer is full.
@ -576,7 +570,6 @@ class NioDatagramWorker implements Runnable {
channel.currentWriteBuffer = null;
buf = null;
evt = null;
bb = null;
future.setFailure(t);
fireExceptionCaught(channel, t);
}

View File

@ -209,7 +209,7 @@ class NioSocketChannel extends AbstractChannel
boolean success = super.offer(e);
assert success;
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
@ -230,7 +230,7 @@ class NioSocketChannel extends AbstractChannel
public MessageEvent poll() {
MessageEvent e = super.poll();
if (e != null) {
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
@ -247,6 +247,14 @@ class NioSocketChannel extends AbstractChannel
}
return e;
}
private int getMessageSize(MessageEvent e) {
Object m = e.getMessage();
if (m instanceof ChannelBuffer) {
return ((ChannelBuffer) m).readableBytes();
}
return 0;
}
}
private final class WriteTask implements Runnable {

View File

@ -439,7 +439,7 @@ class NioWorker implements Runnable {
boolean addOpWrite = false;
boolean removeOpWrite = false;
int writtenBytes = 0;
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final SocketChannel ch = channel.socket;
@ -450,7 +450,6 @@ class NioWorker implements Runnable {
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf;
ByteBuffer bb;
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
@ -458,44 +457,41 @@ class NioWorker implements Runnable {
break;
}
ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage();
channel.currentWriteBuffer = buf = sendBufferPool.acquire(origBuf);
bb = buf.buffer;
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
buf = channel.currentWriteBuffer;
bb = buf.buffer;
}
ChannelFuture future = evt.getFuture();
try {
int oldWrittenBytes = writtenBytes;
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
int localWrittenBytes = ch.write(bb);
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
}
if (!bb.hasRemaining()) {
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
bb = null;
future.setSuccess();
} else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
// Notify progress listeners if necessary.
future.setProgress(
writtenBytes - oldWrittenBytes,
bb.position() - buf.initialPos,
bb.limit() - buf.initialPos);
if (localWrittenBytes > 0) {
// Notify progress listeners if necessary.
future.setProgress(
localWrittenBytes,
buf.writtenBytes(), buf.totalBytes());
}
break;
}
} catch (AsynchronousCloseException e) {
@ -506,7 +502,6 @@ class NioWorker implements Runnable {
channel.currentWriteBuffer = null;
buf = null;
evt = null;
bb = null;
future.setFailure(t);
fireExceptionCaught(channel, t);
if (t instanceof IOException) {

View File

@ -15,10 +15,15 @@
*/
package org.jboss.netty.channel.socket.nio;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.WritableByteChannel;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.FileRegion;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
@ -42,27 +47,34 @@ final class SocketSendBufferPool {
super();
}
final SendBuffer acquire(ChannelBuffer src) {
if (src.isDirect()) {
return new SendBuffer(null, src.toByteBuffer());
}
if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
return new SendBuffer(null, src.toByteBuffer());
final SendBuffer acquire(Object message) {
if (message instanceof ChannelBuffer) {
return acquire((ChannelBuffer) message);
} else if (message instanceof FileRegion) {
return acquire((FileRegion) message);
}
SendBuffer dst = acquire(src.readableBytes());
ByteBuffer dstbuf = dst.buffer;
dstbuf.mark();
src.getBytes(src.readerIndex(), dstbuf);
dstbuf.reset();
return dst;
throw new IllegalArgumentException(
"unsupported message type: " + message.getClass());
}
private final SendBuffer acquire(int size) {
assert size <= DEFAULT_PREALLOCATION_SIZE;
private final SendBuffer acquire(FileRegion src) {
return new FileSendBuffer(src);
}
private final SendBuffer acquire(ChannelBuffer src) {
if (src.isDirect()) {
return new UnpooledSendBuffer(src.toByteBuffer());
}
if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
return new UnpooledSendBuffer(src.toByteBuffer());
}
final int size = src.readableBytes();
Preallocation current = this.current;
ByteBuffer buffer = current.buffer;
int remaining = buffer.remaining();
PooledSendBuffer dst;
if (size < remaining) {
int nextPos = buffer.position() + size;
@ -70,7 +82,7 @@ final class SocketSendBufferPool {
buffer.position(align(nextPos));
slice.limit(nextPos);
current.refCnt ++;
return new SendBuffer(current, slice);
dst = new PooledSendBuffer(current, slice);
} else if (size > remaining) {
this.current = current = getPreallocation();
buffer = current.buffer;
@ -78,12 +90,18 @@ final class SocketSendBufferPool {
buffer.position(align(size));
slice.limit(size);
current.refCnt ++;
return new SendBuffer(current, slice);
dst = new PooledSendBuffer(current, slice);
} else { // size == remaining
current.refCnt ++;
this.current = getPreallocation0();
return new SendBuffer(current, current.buffer);
dst = new PooledSendBuffer(current, current.buffer);
}
ByteBuffer dstbuf = dst.buffer;
dstbuf.mark();
src.getBytes(src.readerIndex(), dstbuf);
dstbuf.reset();
return dst;
}
private final Preallocation getPreallocation() {
@ -142,20 +160,85 @@ final class SocketSendBufferPool {
}
}
final class SendBuffer {
interface SendBuffer {
boolean finished();
long writtenBytes();
long totalBytes();
long transferTo(WritableByteChannel ch) throws IOException;
long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException;
void release();
}
class UnpooledSendBuffer implements SendBuffer {
final ByteBuffer buffer;
final int initialPos;
UnpooledSendBuffer(ByteBuffer buffer) {
this.buffer = buffer;
initialPos = buffer.position();
}
public final boolean finished() {
return !buffer.hasRemaining();
}
public final long writtenBytes() {
return buffer.position() - initialPos;
}
public final long totalBytes() {
return buffer.limit() - initialPos;
}
public final long transferTo(WritableByteChannel ch) throws IOException {
return ch.write(buffer);
}
public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
return ch.send(buffer, raddr);
}
public void release() {
// Unpooled.
}
}
final class PooledSendBuffer implements SendBuffer {
private final Preallocation parent;
final ByteBuffer buffer;
final int initialPos;
SendBuffer(Preallocation parent, ByteBuffer buffer) {
PooledSendBuffer(Preallocation parent, ByteBuffer buffer) {
this.parent = parent;
this.buffer = buffer;
initialPos = buffer.position();
}
void release() {
public boolean finished() {
return !buffer.hasRemaining();
}
public long writtenBytes() {
return buffer.position() - initialPos;
}
public long totalBytes() {
return buffer.limit() - initialPos;
}
public long transferTo(WritableByteChannel ch) throws IOException {
return ch.write(buffer);
}
public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
return ch.send(buffer, raddr);
}
public void release() {
final Preallocation parent = this.parent;
if (parent != null && -- parent.refCnt == 0) {
if (-- parent.refCnt == 0) {
parent.buffer.clear();
if (parent != current) {
poolHead = new PreallocationRef(parent, poolHead);
@ -163,4 +246,42 @@ final class SocketSendBufferPool {
}
}
}
final class FileSendBuffer implements SendBuffer {
private final FileRegion file;
private long writtenBytes;
FileSendBuffer(FileRegion file) {
this.file = file;
}
public boolean finished() {
return writtenBytes >= file.getCount();
}
public long writtenBytes() {
return writtenBytes;
}
public long totalBytes() {
return file.getCount();
}
public long transferTo(WritableByteChannel ch) throws IOException {
long localWrittenBytes = file.transferTo(ch);
writtenBytes += localWrittenBytes;
return localWrittenBytes;
}
public long transferTo(DatagramChannel ch, SocketAddress raddr)
throws IOException {
throw new UnsupportedOperationException();
}
public void release() {
// Unpooled.
}
}
}

View File

@ -32,6 +32,7 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.DefaultFileRegion;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@ -40,7 +41,6 @@ import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.stream.ChunkedFile;
import org.jboss.netty.util.CharsetUtil;
/**
@ -91,7 +91,8 @@ public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
ch.write(response);
// Write the content.
ChannelFuture writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
//ChannelFuture writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
ChannelFuture writeFuture = ch.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength));
// Decide whether to close the connection or not.
if (!isKeepAlive(request)) {
@ -105,6 +106,8 @@ public class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
throws Exception {
Channel ch = e.getChannel();
Throwable cause = e.getCause();
cause.printStackTrace();
System.exit(1);
if (cause instanceof TooLongFrameException) {
sendError(ctx, BAD_REQUEST);
return;