Split AbstractNioChannel into two subtypes

- AbstractNioMessageChannel and AbstractNioStreamChannel
- Better performance
- Replaced 'if' checks with polymorphism
This commit is contained in:
Trustin Lee 2012-05-27 05:07:23 -07:00
parent 7327bb3522
commit 064b3dc9e5
7 changed files with 226 additions and 147 deletions

View File

@ -579,6 +579,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
@Override
public void flushNow() {
try {
doFlush(out());

View File

@ -15,13 +15,10 @@
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -32,7 +29,6 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -94,11 +90,6 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return (NioUnsafe) super.unsafe();
}
@Override
protected NioUnsafe newUnsafe() {
return new DefaultNioUnsafe();
}
protected SelectableChannel javaChannel() {
return ch;
}
@ -134,29 +125,13 @@ public abstract class AbstractNioChannel extends AbstractChannel {
protected abstract void doFinishConnect() throws Exception;
protected int doReadMessages(Queue<Object> buf) throws Exception {
throw new UnsupportedOperationException();
}
protected int doReadBytes(ChannelBuffer buf) throws Exception {
throw new UnsupportedOperationException();
}
protected int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
public interface NioUnsafe extends Unsafe {
java.nio.channels.Channel ch();
void finishConnect();
void read();
}
private class DefaultNioUnsafe extends AbstractUnsafe implements NioUnsafe {
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
@Override
public java.nio.channels.Channel ch() {
return javaChannel();
@ -240,122 +215,5 @@ public abstract class AbstractNioChannel extends AbstractChannel {
connectFuture = null;
}
}
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
boolean closed = false;
boolean read = false;
try {
if (buf.hasMessageBuffer()) {
Queue<Object> msgBuf = buf.messageBuffer();
for (;;) {
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount == 0) {
break;
} else if (localReadAmount < 0) {
closed = true;
break;
}
}
} else {
ChannelBuffer byteBuf = buf.byteBuffer();
for (;;) {
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
break;
}
if (!expandReadBuffer(byteBuf)) {
break;
}
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidFuture());
}
}
}
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
if (buf.hasByteBuffer()) {
flushByteBuf(buf.byteBuffer());
} else {
flushMessageBuf(buf.messageBuffer());
}
}
private void flushByteBuf(ChannelBuffer buf) throws Exception {
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
return;
}
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf, i == 0);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
break;
}
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
break;
}
}
}
private void flushMessageBuf(Queue<Object> buf) throws Exception {
final int writeSpinCount = config().getWriteSpinCount() - 1;
while (!buf.isEmpty()) {
boolean wrote = false;
for (int i = writeSpinCount; i >= 0; i --) {
int localFlushedAmount = doWriteMessages(buf, i == 0);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
wrote = true;
notifyFlushFutures();
break;
}
}
if (!wrote) {
break;
}
}
}
private static boolean expandReadBuffer(ChannelBuffer byteBuf) {
if (!byteBuf.writable()) {
// FIXME: Magic number
byteBuf.ensureWritableBytes(4096);
return true;
}
return false;
}
}

View File

@ -0,0 +1,100 @@
package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelPipeline;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.util.Queue;
abstract class AbstractNioMessageChannel extends AbstractNioChannel {
private final ChannelBufferHolder<Object> firstOut = ChannelBufferHolders.messageBuffer();
protected AbstractNioMessageChannel(
Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) {
super(parent, id, ch, defaultInterestOps);
}
@Override
protected ChannelBufferHolder<Object> firstOut() {
return firstOut;
}
@Override
protected Unsafe newUnsafe() {
return new NioMessageUnsafe();
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
flushMessageBuf(buf.messageBuffer());
}
private void flushMessageBuf(Queue<Object> buf) throws Exception {
final int writeSpinCount = config().getWriteSpinCount() - 1;
while (!buf.isEmpty()) {
boolean wrote = false;
for (int i = writeSpinCount; i >= 0; i --) {
int localFlushedAmount = doWriteMessages(buf, i == 0);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
wrote = true;
notifyFlushFutures();
break;
}
}
if (!wrote) {
break;
}
}
}
private class NioMessageUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
boolean closed = false;
boolean read = false;
try {
Queue<Object> msgBuf = buf.messageBuffer();
for (;;) {
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount == 0) {
break;
} else if (localReadAmount < 0) {
closed = true;
break;
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidFuture());
}
}
}
}
protected abstract int doReadMessages(Queue<Object> buf) throws Exception;
protected abstract int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception;
}

View File

@ -0,0 +1,115 @@
package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelPipeline;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
abstract class AbstractNioStreamChannel extends AbstractNioChannel {
private final ChannelBufferHolder<?> firstOut = ChannelBufferHolders.byteBuffer();
protected AbstractNioStreamChannel(
Channel parent, Integer id, SelectableChannel ch) {
super(parent, id, ch, SelectionKey.OP_READ);
}
@Override
@SuppressWarnings("unchecked")
protected ChannelBufferHolder<Object> firstOut() {
return (ChannelBufferHolder<Object>) firstOut;
}
@Override
protected Unsafe newUnsafe() {
return new NioStreamUnsafe();
}
private class NioStreamUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
boolean closed = false;
boolean read = false;
try {
ChannelBuffer byteBuf = buf.byteBuffer();
for (;;) {
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
break;
}
if (!expandReadBuffer(byteBuf)) {
break;
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidFuture());
}
}
}
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
flushByteBuf(buf.byteBuffer());
}
private void flushByteBuf(ChannelBuffer buf) throws Exception {
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
return;
}
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf, i == 0);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
break;
}
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
break;
}
}
}
protected abstract int doReadBytes(ChannelBuffer buf) throws Exception;
protected abstract int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception;
private static boolean expandReadBuffer(ChannelBuffer byteBuf) {
if (!byteBuf.writable()) {
// FIXME: Magic number
byteBuf.ensureWritableBytes(4096);
return true;
}
return false;
}
}

View File

@ -44,7 +44,7 @@ import java.util.Queue;
/**
* Provides an NIO based {@link io.netty.channel.socket.DatagramChannel}.
*/
public final class NioDatagramChannel extends AbstractNioChannel implements io.netty.channel.socket.DatagramChannel {
public final class NioDatagramChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.DatagramChannel {
private final DatagramChannelConfig config;
private final Map<InetAddress, List<MembershipKey>> memberships =

View File

@ -28,7 +28,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.util.Queue;
public class NioServerSocketChannel extends AbstractNioChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
private static ServerSocketChannel newSocket() {
@ -120,4 +120,9 @@ public class NioServerSocketChannel extends AbstractNioChannel
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
}

View File

@ -30,7 +30,7 @@ import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
public class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel {
public class NioSocketChannel extends AbstractNioStreamChannel implements io.netty.channel.socket.SocketChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
@ -54,7 +54,7 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
}
public NioSocketChannel(Channel parent, Integer id, SocketChannel socket) {
super(parent, id, socket, SelectionKey.OP_READ);
super(parent, id, socket);
try {
socket.configureBlocking(false);
} catch (IOException e) {