Split AbstractChannel.doFlush() into two variants for simpler user impl

- Also renamed doRead() to doReadMessages() and doReadBytes()
This commit is contained in:
Trustin Lee 2012-05-25 15:32:28 -07:00
parent f60f918763
commit 61314ef51b
8 changed files with 37 additions and 66 deletions

View File

@ -647,7 +647,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (buf.hasMessageBuffer()) { if (buf.hasMessageBuffer()) {
Queue<Object> msgBuf = buf.messageBuffer(); Queue<Object> msgBuf = buf.messageBuffer();
for (;;) { for (;;) {
int localReadAmount = doRead(msgBuf); int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) { if (localReadAmount > 0) {
read = true; read = true;
} else if (localReadAmount == 0) { } else if (localReadAmount == 0) {
@ -660,7 +660,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} else { } else {
ChannelBuffer byteBuf = buf.byteBuffer(); ChannelBuffer byteBuf = buf.byteBuffer();
for (;;) { for (;;) {
int localReadAmount = doRead(byteBuf); int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) { if (localReadAmount > 0) {
read = true; read = true;
} else if (localReadAmount < 0) { } else if (localReadAmount < 0) {
@ -750,8 +750,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
private void flushByteBuf(ChannelBuffer buf) throws Exception { private void flushByteBuf(ChannelBuffer buf) throws Exception {
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
return;
}
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
int localFlushedAmount = doFlush(i == 0); int localFlushedAmount = doWriteBytes(buf, i == 0);
if (localFlushedAmount > 0) { if (localFlushedAmount > 0) {
flushedAmount += localFlushedAmount; flushedAmount += localFlushedAmount;
notifyFlushFutures(); notifyFlushFutures();
@ -770,7 +776,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
while (!buf.isEmpty()) { while (!buf.isEmpty()) {
boolean wrote = false; boolean wrote = false;
for (int i = writeSpinCount; i >= 0; i --) { for (int i = writeSpinCount; i >= 0; i --) {
int localFlushedAmount = doFlush(i == 0); int localFlushedAmount = doWriteMessages(buf, i == 0);
if (localFlushedAmount > 0) { if (localFlushedAmount > 0) {
flushedAmount += localFlushedAmount; flushedAmount += localFlushedAmount;
wrote = true; wrote = true;
@ -915,9 +921,22 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract void doClose() throws Exception; protected abstract void doClose() throws Exception;
protected abstract void doDeregister() throws Exception; protected abstract void doDeregister() throws Exception;
protected abstract int doRead(Queue<Object> buf) throws Exception; protected int doReadMessages(Queue<Object> buf) throws Exception {
protected abstract int doRead(ChannelBuffer buf) throws Exception; throw new UnsupportedOperationException();
protected abstract int doFlush(boolean lastSpin) throws Exception; }
protected int doReadBytes(ChannelBuffer buf) throws Exception {
throw new UnsupportedOperationException();
}
protected int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
protected abstract boolean inEventLoopDrivenFlush(); protected abstract boolean inEventLoopDrivenFlush();
private static boolean expandReadBuffer(ChannelBuffer byteBuf) { private static boolean expandReadBuffer(ChannelBuffer byteBuf) {

View File

@ -15,8 +15,6 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.AbstractQueue; import java.util.AbstractQueue;
import java.util.Collections; import java.util.Collections;
@ -78,16 +76,6 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
protected int doRead(ChannelBuffer buf) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected int doFlush(boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
@Override @Override
protected boolean inEventLoopDrivenFlush() { protected boolean inEventLoopDrivenFlush() {
return false; return false;

View File

@ -15,7 +15,6 @@
*/ */
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelBufferHolders;
@ -173,7 +172,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
} }
@Override @Override
protected int doRead(Queue<Object> buf) throws Exception { protected int doReadMessages(Queue<Object> buf) throws Exception {
DatagramChannel ch = javaChannel(); DatagramChannel ch = javaChannel();
ByteBuffer data = ByteBuffer.allocate(config().getReceivePacketSize()); ByteBuffer data = ByteBuffer.allocate(config().getReceivePacketSize());
InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(data); InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(data);
@ -187,17 +186,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
} }
@Override @Override
protected int doRead(ChannelBuffer buf) throws Exception { protected int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected int doFlush(boolean lastSpin) throws Exception {
final Queue<Object> buf = unsafe().out().messageBuffer();
if (buf.isEmpty()) {
return 0;
}
DatagramPacket packet = (DatagramPacket) buf.peek(); DatagramPacket packet = (DatagramPacket) buf.peek();
final int writtenBytes = javaChannel().send(packet.data().toByteBuffer(), packet.remoteAddress()); final int writtenBytes = javaChannel().send(packet.data().toByteBuffer(), packet.remoteAddress());

View File

@ -128,7 +128,7 @@ public class NioServerSocketChannel extends AbstractServerChannel
} }
@Override @Override
protected int doRead(Queue<Object> buf) throws Exception { protected int doReadMessages(Queue<Object> buf) throws Exception {
java.nio.channels.SocketChannel ch = javaChannel().accept(); java.nio.channels.SocketChannel ch = javaChannel().accept();
if (ch == null) { if (ch == null) {
return 0; return 0;

View File

@ -29,7 +29,6 @@ import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Queue;
public class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel { public class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel {
@ -160,23 +159,13 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
} }
@Override @Override
protected int doRead(ChannelBuffer byteBuf) throws Exception { protected int doReadBytes(ChannelBuffer byteBuf) throws Exception {
return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
} }
@Override @Override
protected int doRead(Queue<Object> buf) throws Exception { protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected int doFlush(boolean lastSpin) throws Exception {
final ChannelBuffer buf = unsafe().out().byteBuffer();
final int expectedWrittenBytes = buf.readableBytes(); final int expectedWrittenBytes = buf.readableBytes();
if (expectedWrittenBytes == 0) {
return 0;
}
final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes); final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes);
final SelectionKey key = selectionKey(); final SelectionKey key = selectionKey();

View File

@ -194,7 +194,7 @@ public class OioDatagramChannel extends AbstractChannel
} }
@Override @Override
protected int doRead(Queue<Object> buf) throws Exception { protected int doReadMessages(Queue<Object> buf) throws Exception {
int packetSize = config().getReceivePacketSize(); int packetSize = config().getReceivePacketSize();
byte[] data = new byte[packetSize]; byte[] data = new byte[packetSize];
tmpPacket.setData(data); tmpPacket.setData(data);
@ -214,17 +214,7 @@ public class OioDatagramChannel extends AbstractChannel
} }
@Override @Override
protected int doRead(ChannelBuffer buf) throws Exception { protected int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception {
throw new Error();
}
@Override
protected int doFlush(boolean lastSpin) throws Exception {
final Queue<Object> buf = unsafe().out().messageBuffer();
if (buf.isEmpty()) {
return 0;
}
DatagramPacket p = (DatagramPacket) buf.poll(); DatagramPacket p = (DatagramPacket) buf.poll();
ChannelBuffer data = p.data(); ChannelBuffer data = p.data();
int length = data.readableBytes(); int length = data.readableBytes();

View File

@ -152,7 +152,7 @@ public class OioServerSocketChannel extends AbstractServerChannel
} }
@Override @Override
protected int doRead(Queue<Object> buf) throws Exception { protected int doReadMessages(Queue<Object> buf) throws Exception {
if (socket.isClosed()) { if (socket.isClosed()) {
return -1; return -1;
} }

View File

@ -187,12 +187,12 @@ public class OioSocketChannel extends AbstractChannel
} }
@Override @Override
protected int doRead(Queue<Object> buf) throws Exception { protected int doReadMessages(Queue<Object> buf) throws Exception {
throw new Error(); throw new Error();
} }
@Override @Override
protected int doRead(ChannelBuffer buf) throws Exception { protected int doReadBytes(ChannelBuffer buf) throws Exception {
if (socket.isClosed()) { if (socket.isClosed()) {
return -1; return -1;
} }
@ -206,16 +206,12 @@ public class OioSocketChannel extends AbstractChannel
} }
@Override @Override
protected int doFlush(boolean lastSpin) throws Exception { protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception {
OutputStream os = this.os; OutputStream os = this.os;
if (os == null) { if (os == null) {
throw new NotYetConnectedException(); throw new NotYetConnectedException();
} }
final ChannelBuffer buf = unsafe().out().byteBuffer();
final int length = buf.readableBytes(); final int length = buf.readableBytes();
if (length == 0) {
return 0;
}
buf.readBytes(os, length); buf.readBytes(os, length);
return length; return length;
} }