From 7b05f3417162caffee26aef59e005710bb8bac1c Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sun, 27 May 2012 05:19:45 -0700 Subject: [PATCH] Split AbstractOioChannel into its subtypes - AbstractOioMessageChannel and AbstractOioStreamChannel - Replaced 'if' with polymorphism - Better performance --- .../socket/oio/AbstractOioChannel.java | 103 +----------------- .../socket/oio/AbstractOioMessageChannel.java | 83 ++++++++++++++ .../socket/oio/AbstractOioStreamChannel.java | 85 +++++++++++++++ .../socket/oio/OioDatagramChannel.java | 2 +- .../socket/oio/OioServerSocketChannel.java | 7 +- .../channel/socket/oio/OioSocketChannel.java | 2 +- 6 files changed, 177 insertions(+), 105 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java create mode 100644 transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java index c46809e0a6..4474446e98 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java @@ -1,17 +1,12 @@ package io.netty.channel.socket.oio; -import io.netty.buffer.ChannelBuffer; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; -import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoop; -import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Queue; abstract class AbstractOioChannel extends AbstractChannel { @@ -34,11 +29,6 @@ abstract class AbstractOioChannel extends AbstractChannel { return (OioUnsafe) super.unsafe(); } - @Override - protected OioUnsafe newUnsafe() { - return new DefaultOioUnsafe(); - } - @Override protected boolean isCompatible(EventLoop loop) { return loop instanceof OioChildEventLoop; @@ -63,7 +53,7 @@ abstract class AbstractOioChannel extends AbstractChannel { void read(); } - private class DefaultOioUnsafe extends AbstractUnsafe implements OioUnsafe { + protected abstract class AbstractOioUnsafe extends AbstractUnsafe implements OioUnsafe { @Override public void connect( @@ -95,99 +85,8 @@ abstract class AbstractOioChannel extends AbstractChannel { }); } } - - @Override - public void read() { - assert eventLoop().inEventLoop(); - - final ChannelPipeline pipeline = pipeline(); - final ChannelBufferHolder buf = pipeline.nextIn(); - boolean closed = false; - boolean read = false; - try { - if (buf.hasMessageBuffer()) { - Queue msgBuf = buf.messageBuffer(); - int localReadAmount = doReadMessages(msgBuf); - if (localReadAmount > 0) { - read = true; - } else if (localReadAmount < 0) { - closed = true; - } - } else { - ChannelBuffer byteBuf = buf.byteBuffer(); - int localReadAmount = doReadBytes(byteBuf); - if (localReadAmount > 0) { - read = true; - } else if (localReadAmount < 0) { - closed = true; - } - } - } catch (Throwable t) { - if (read) { - read = false; - pipeline.fireInboundBufferUpdated(); - } - pipeline().fireExceptionCaught(t); - if (t instanceof IOException) { - close(voidFuture()); - } - } finally { - if (read) { - pipeline.fireInboundBufferUpdated(); - } - if (closed && isOpen()) { - close(voidFuture()); - } - } - } } protected abstract void doConnect( SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; - - protected int doReadMessages(Queue buf) throws Exception { - throw new UnsupportedOperationException(); - } - - protected int doReadBytes(ChannelBuffer buf) throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - protected void doFlush(ChannelBufferHolder buf) throws Exception { - if (buf.hasByteBuffer()) { - flushByteBuf(buf.byteBuffer()); - } else { - flushMessageBuf(buf.messageBuffer()); - } - } - - private void flushByteBuf(ChannelBuffer buf) throws Exception { - while (buf.readable()) { - int localFlushedAmount = doWriteBytes(buf); - if (localFlushedAmount > 0) { - writeCounter += localFlushedAmount; - notifyFlushFutures(); - } - } - buf.clear(); - } - - private void flushMessageBuf(Queue buf) throws Exception { - while (!buf.isEmpty()) { - int localFlushedAmount = doWriteMessages(buf); - if (localFlushedAmount > 0) { - writeCounter += localFlushedAmount; - notifyFlushFutures(); - } - } - } - - protected int doWriteMessages(Queue buf) throws Exception { - throw new UnsupportedOperationException(); - } - - protected int doWriteBytes(ChannelBuffer buf) throws Exception { - throw new UnsupportedOperationException(); - } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java new file mode 100644 index 0000000000..0b08b55011 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java @@ -0,0 +1,83 @@ +package io.netty.channel.socket.oio; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelPipeline; + +import java.io.IOException; +import java.util.Queue; + +abstract class AbstractOioMessageChannel extends AbstractOioChannel { + + private final ChannelBufferHolder firstOut = ChannelBufferHolders.messageBuffer(); + + protected AbstractOioMessageChannel(Channel parent, Integer id) { + super(parent, id); + } + + @Override + protected ChannelBufferHolder firstOut() { + return firstOut; + } + + @Override + protected Unsafe newUnsafe() { + return new OioMessageUnsafe(); + } + + private class OioMessageUnsafe extends AbstractOioUnsafe { + @Override + public void read() { + assert eventLoop().inEventLoop(); + + final ChannelPipeline pipeline = pipeline(); + final ChannelBufferHolder buf = pipeline.nextIn(); + boolean closed = false; + boolean read = false; + try { + Queue msgBuf = buf.messageBuffer(); + int localReadAmount = doReadMessages(msgBuf); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount < 0) { + closed = true; + } + } catch (Throwable t) { + if (read) { + read = false; + pipeline.fireInboundBufferUpdated(); + } + pipeline().fireExceptionCaught(t); + if (t instanceof IOException) { + close(voidFuture()); + } + } finally { + if (read) { + pipeline.fireInboundBufferUpdated(); + } + if (closed && isOpen()) { + close(voidFuture()); + } + } + } + } + + @Override + protected void doFlush(ChannelBufferHolder buf) throws Exception { + flushMessageBuf(buf.messageBuffer()); + } + + private void flushMessageBuf(Queue buf) throws Exception { + while (!buf.isEmpty()) { + int localFlushedAmount = doWriteMessages(buf); + if (localFlushedAmount > 0) { + writeCounter += localFlushedAmount; + notifyFlushFutures(); + } + } + } + + protected abstract int doReadMessages(Queue buf) throws Exception; + protected abstract int doWriteMessages(Queue buf) throws Exception; +} diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java new file mode 100644 index 0000000000..771bee6531 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java @@ -0,0 +1,85 @@ +package io.netty.channel.socket.oio; + +import io.netty.buffer.ChannelBuffer; +import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelPipeline; + +import java.io.IOException; + +abstract class AbstractOioStreamChannel extends AbstractOioChannel { + + private final ChannelBufferHolder firstOut = ChannelBufferHolders.byteBuffer(); + + protected AbstractOioStreamChannel(Channel parent, Integer id) { + super(parent, id); + } + + @Override + @SuppressWarnings("unchecked") + protected ChannelBufferHolder firstOut() { + return (ChannelBufferHolder) firstOut; + } + + @Override + protected Unsafe newUnsafe() { + return new OioStreamUnsafe(); + } + + private class OioStreamUnsafe extends AbstractOioUnsafe { + @Override + public void read() { + assert eventLoop().inEventLoop(); + + final ChannelPipeline pipeline = pipeline(); + final ChannelBufferHolder buf = pipeline.nextIn(); + boolean closed = false; + boolean read = false; + try { + ChannelBuffer byteBuf = buf.byteBuffer(); + int localReadAmount = doReadBytes(byteBuf); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount < 0) { + closed = true; + } + } catch (Throwable t) { + if (read) { + read = false; + pipeline.fireInboundBufferUpdated(); + } + pipeline().fireExceptionCaught(t); + if (t instanceof IOException) { + close(voidFuture()); + } + } finally { + if (read) { + pipeline.fireInboundBufferUpdated(); + } + if (closed && isOpen()) { + close(voidFuture()); + } + } + } + } + + @Override + protected void doFlush(ChannelBufferHolder buf) throws Exception { + flushByteBuf(buf.byteBuffer()); + } + + private void flushByteBuf(ChannelBuffer buf) throws Exception { + while (buf.readable()) { + int localFlushedAmount = doWriteBytes(buf); + if (localFlushedAmount > 0) { + writeCounter += localFlushedAmount; + notifyFlushFutures(); + } + } + buf.clear(); + } + + protected abstract int doReadBytes(ChannelBuffer buf) throws Exception; + protected abstract int doWriteBytes(ChannelBuffer buf) throws Exception; +} diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 8ea48b7153..290056ace2 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -38,7 +38,7 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.Queue; -public class OioDatagramChannel extends AbstractOioChannel +public class OioDatagramChannel extends AbstractOioMessageChannel implements DatagramChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index b095930ce9..bf42590a01 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -34,7 +34,7 @@ import java.util.Queue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public class OioServerSocketChannel extends AbstractOioChannel +public class OioServerSocketChannel extends AbstractOioMessageChannel implements ServerSocketChannel { private static final InternalLogger logger = @@ -174,4 +174,9 @@ public class OioServerSocketChannel extends AbstractOioChannel protected void doDisconnect() throws Exception { throw new UnsupportedOperationException(); } + + @Override + protected int doWriteMessages(Queue buf) throws Exception { + throw new UnsupportedOperationException(); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index 0959c5aedf..8bc90c6742 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -34,7 +34,7 @@ import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.nio.channels.NotYetConnectedException; -public class OioSocketChannel extends AbstractOioChannel +public class OioSocketChannel extends AbstractOioStreamChannel implements SocketChannel { private static final InternalLogger logger =