Reorder methods / Move buffer expansion logic in OIO

This commit is contained in:
Trustin Lee 2012-05-27 05:31:18 -07:00
parent 7b05f34171
commit f4a19886d3
6 changed files with 93 additions and 85 deletions

View File

@ -99,32 +99,6 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return selectionKey;
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof NioChildEventLoop;
}
@Override
protected boolean isFlushPending() {
return (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
@Override
protected void doRegister() throws Exception {
NioChildEventLoop loop = (NioChildEventLoop) eventLoop();
selectionKey = javaChannel().register(
loop.selector, isActive()? defaultInterestOps : 0, this);
}
@Override
protected void doDeregister() throws Exception {
((NioChildEventLoop) eventLoop()).cancel(selectionKey());
}
protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
protected abstract void doFinishConnect() throws Exception;
public interface NioUnsafe extends Unsafe {
java.nio.channels.Channel ch();
void finishConnect();
@ -216,4 +190,29 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
}
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof NioChildEventLoop;
}
@Override
protected boolean isFlushPending() {
return (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
@Override
protected void doRegister() throws Exception {
NioChildEventLoop loop = (NioChildEventLoop) eventLoop();
selectionKey = javaChannel().register(
loop.selector, isActive()? defaultInterestOps : 0, this);
}
@Override
protected void doDeregister() throws Exception {
((NioChildEventLoop) eventLoop()).cancel(selectionKey());
}
protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
protected abstract void doFinishConnect() throws Exception;
}

View File

@ -28,31 +28,6 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
return new NioMessageUnsafe();
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
flushMessageBuf(buf.messageBuffer());
}
private void flushMessageBuf(Queue<Object> buf) throws Exception {
final int writeSpinCount = config().getWriteSpinCount() - 1;
while (!buf.isEmpty()) {
boolean wrote = false;
for (int i = writeSpinCount; i >= 0; i --) {
int localFlushedAmount = doWriteMessages(buf, i == 0);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
wrote = true;
notifyFlushFutures();
break;
}
}
if (!wrote) {
break;
}
}
}
private class NioMessageUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
@ -95,6 +70,31 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
}
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
flushMessageBuf(buf.messageBuffer());
}
private void flushMessageBuf(Queue<Object> buf) throws Exception {
final int writeSpinCount = config().getWriteSpinCount() - 1;
while (!buf.isEmpty()) {
boolean wrote = false;
for (int i = writeSpinCount; i >= 0; i --) {
int localFlushedAmount = doWriteMessages(buf, i == 0);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
wrote = true;
notifyFlushFutures();
break;
}
}
if (!wrote) {
break;
}
}
}
protected abstract int doReadMessages(Queue<Object> buf) throws Exception;
protected abstract int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception;
}

View File

@ -41,6 +41,7 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel {
boolean read = false;
try {
ChannelBuffer byteBuf = buf.byteBuffer();
expandReadBuffer(byteBuf);
for (;;) {
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {

View File

@ -29,26 +29,6 @@ abstract class AbstractOioChannel extends AbstractChannel {
return (OioUnsafe) super.unsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof OioChildEventLoop;
}
@Override
protected void doRegister() throws Exception {
// NOOP
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected boolean isFlushPending() {
return false;
}
public interface OioUnsafe extends Unsafe {
void read();
}
@ -87,6 +67,26 @@ abstract class AbstractOioChannel extends AbstractChannel {
}
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof OioChildEventLoop;
}
@Override
protected void doRegister() throws Exception {
// NOOP
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected boolean isFlushPending() {
return false;
}
protected abstract void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
}

View File

@ -38,6 +38,7 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel {
boolean read = false;
try {
ChannelBuffer byteBuf = buf.byteBuffer();
expandReadBuffer(byteBuf);
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
@ -80,6 +81,17 @@ abstract class AbstractOioStreamChannel extends AbstractOioChannel {
buf.clear();
}
protected abstract int available();
protected abstract int doReadBytes(ChannelBuffer buf) throws Exception;
protected abstract int doWriteBytes(ChannelBuffer buf) throws Exception;
private void expandReadBuffer(ChannelBuffer byteBuf) {
int available = available();
if (available > 0) {
byteBuf.ensureWritableBytes(available);
} else if (!byteBuf.writable()) {
// FIXME: Magic number
byteBuf.ensureWritableBytes(4096);
}
}
}

View File

@ -146,26 +146,22 @@ public class OioSocketChannel extends AbstractOioStreamChannel
socket.close();
}
@Override
protected int available() {
try {
return is.available();
} catch (IOException e) {
return 0;
}
}
@Override
protected int doReadBytes(ChannelBuffer buf) throws Exception {
if (socket.isClosed()) {
return -1;
}
try {
int available = is.available();
if (available > 0) {
buf.ensureWritableBytes(available);
} else if (!buf.writable()) {
// FIXME: Magic number
buf.ensureWritableBytes(4096);
}
int readBytes = buf.writeBytes(is, buf.writableBytes());
if (!buf.writable()) {
// FIXME: Magic number
buf.ensureWritableBytes(4096);
}
return readBytes;
return buf.writeBytes(is, buf.writableBytes());
} catch (SocketTimeoutException e) {
return 0;
}