Split AbstractOioChannel into its subtypes

- AbstractOioMessageChannel and AbstractOioStreamChannel
- Replaced 'if' with polymorphism
- Better performance
This commit is contained in:
Trustin Lee 2012-05-27 05:19:45 -07:00
parent 064b3dc9e5
commit 7b05f34171
6 changed files with 177 additions and 105 deletions

View File

@ -1,17 +1,12 @@
package io.netty.channel.socket.oio; package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.AbstractChannel; import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Queue;
abstract class AbstractOioChannel extends AbstractChannel { abstract class AbstractOioChannel extends AbstractChannel {
@ -34,11 +29,6 @@ abstract class AbstractOioChannel extends AbstractChannel {
return (OioUnsafe) super.unsafe(); return (OioUnsafe) super.unsafe();
} }
@Override
protected OioUnsafe newUnsafe() {
return new DefaultOioUnsafe();
}
@Override @Override
protected boolean isCompatible(EventLoop loop) { protected boolean isCompatible(EventLoop loop) {
return loop instanceof OioChildEventLoop; return loop instanceof OioChildEventLoop;
@ -63,7 +53,7 @@ abstract class AbstractOioChannel extends AbstractChannel {
void read(); void read();
} }
private class DefaultOioUnsafe extends AbstractUnsafe implements OioUnsafe { protected abstract class AbstractOioUnsafe extends AbstractUnsafe implements OioUnsafe {
@Override @Override
public void connect( public void connect(
@ -95,99 +85,8 @@ abstract class AbstractOioChannel extends AbstractChannel {
}); });
} }
} }
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
boolean closed = false;
boolean read = false;
try {
if (buf.hasMessageBuffer()) {
Queue<Object> msgBuf = buf.messageBuffer();
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
} else {
ChannelBuffer byteBuf = buf.byteBuffer();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidFuture());
}
}
}
} }
protected abstract void doConnect( protected abstract void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
protected int doReadMessages(Queue<Object> buf) throws Exception {
throw new UnsupportedOperationException();
}
protected int doReadBytes(ChannelBuffer buf) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
if (buf.hasByteBuffer()) {
flushByteBuf(buf.byteBuffer());
} else {
flushMessageBuf(buf.messageBuffer());
}
}
private void flushByteBuf(ChannelBuffer buf) throws Exception {
while (buf.readable()) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
}
}
buf.clear();
}
private void flushMessageBuf(Queue<Object> buf) throws Exception {
while (!buf.isEmpty()) {
int localFlushedAmount = doWriteMessages(buf);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
}
}
}
protected int doWriteMessages(Queue<Object> buf) throws Exception {
throw new UnsupportedOperationException();
}
protected int doWriteBytes(ChannelBuffer buf) throws Exception {
throw new UnsupportedOperationException();
}
} }

View File

@ -0,0 +1,83 @@
package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelPipeline;
import java.io.IOException;
import java.util.Queue;
abstract class AbstractOioMessageChannel extends AbstractOioChannel {
private final ChannelBufferHolder<Object> firstOut = ChannelBufferHolders.messageBuffer();
protected AbstractOioMessageChannel(Channel parent, Integer id) {
super(parent, id);
}
@Override
protected ChannelBufferHolder<Object> firstOut() {
return firstOut;
}
@Override
protected Unsafe newUnsafe() {
return new OioMessageUnsafe();
}
private class OioMessageUnsafe extends AbstractOioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
boolean closed = false;
boolean read = false;
try {
Queue<Object> msgBuf = buf.messageBuffer();
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidFuture());
}
}
}
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
flushMessageBuf(buf.messageBuffer());
}
private void flushMessageBuf(Queue<Object> buf) throws Exception {
while (!buf.isEmpty()) {
int localFlushedAmount = doWriteMessages(buf);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
}
}
}
protected abstract int doReadMessages(Queue<Object> buf) throws Exception;
protected abstract int doWriteMessages(Queue<Object> buf) throws Exception;
}

View File

@ -0,0 +1,85 @@
package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelPipeline;
import java.io.IOException;
abstract class AbstractOioStreamChannel extends AbstractOioChannel {
private final ChannelBufferHolder<?> firstOut = ChannelBufferHolders.byteBuffer();
protected AbstractOioStreamChannel(Channel parent, Integer id) {
super(parent, id);
}
@Override
@SuppressWarnings("unchecked")
protected ChannelBufferHolder<Object> firstOut() {
return (ChannelBufferHolder<Object>) firstOut;
}
@Override
protected Unsafe newUnsafe() {
return new OioStreamUnsafe();
}
private class OioStreamUnsafe extends AbstractOioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
boolean closed = false;
boolean read = false;
try {
ChannelBuffer byteBuf = buf.byteBuffer();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidFuture());
}
}
}
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
flushByteBuf(buf.byteBuffer());
}
private void flushByteBuf(ChannelBuffer buf) throws Exception {
while (buf.readable()) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
}
}
buf.clear();
}
protected abstract int doReadBytes(ChannelBuffer buf) throws Exception;
protected abstract int doWriteBytes(ChannelBuffer buf) throws Exception;
}

View File

@ -38,7 +38,7 @@ import java.net.SocketException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.Queue; import java.util.Queue;
public class OioDatagramChannel extends AbstractOioChannel public class OioDatagramChannel extends AbstractOioMessageChannel
implements DatagramChannel { implements DatagramChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);

View File

@ -34,7 +34,7 @@ import java.util.Queue;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
public class OioServerSocketChannel extends AbstractOioChannel public class OioServerSocketChannel extends AbstractOioMessageChannel
implements ServerSocketChannel { implements ServerSocketChannel {
private static final InternalLogger logger = private static final InternalLogger logger =
@ -174,4 +174,9 @@ public class OioServerSocketChannel extends AbstractOioChannel
protected void doDisconnect() throws Exception { protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
protected int doWriteMessages(Queue<Object> buf) throws Exception {
throw new UnsupportedOperationException();
}
} }

View File

@ -34,7 +34,7 @@ import java.net.SocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
public class OioSocketChannel extends AbstractOioChannel public class OioSocketChannel extends AbstractOioStreamChannel
implements SocketChannel { implements SocketChannel {
private static final InternalLogger logger = private static final InternalLogger logger =