Split AbstractChannel into AbstractOioChannel and AbstractNioChannel

- Simpler OIO transport
- Suits better for other transports such as AIO, RXTX, IOStream
- Add ChannelBufferHolders.discardBuffer()
This commit is contained in:
Trustin Lee 2012-05-26 22:48:48 -07:00
parent a1bdf671f1
commit 6206d82b2c
15 changed files with 919 additions and 640 deletions

View File

@ -16,15 +16,14 @@
package io.netty.handler.codec.embedder;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Queue;
@ -36,22 +35,11 @@ class EmbeddedChannel extends AbstractChannel {
private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
private final Queue<Object> productQueue;
private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED
private final java.nio.channels.Channel javaChannel = new java.nio.channels.Channel() {
@Override
public boolean isOpen() {
return state < 2;
}
@Override
public void close() throws IOException {
// NOOP
}
};
EmbeddedChannel(Queue<Object> productQueue) {
super(null, null);
this.productQueue = productQueue;
firstOut = ChannelBufferHolders.catchAllBuffer(productQueue, ChannelBuffers.dynamicBuffer());
firstOut = ChannelBufferHolders.catchAllBuffer();
}
@Override
@ -59,6 +47,11 @@ class EmbeddedChannel extends AbstractChannel {
return config;
}
@Override
public boolean isOpen() {
return state < 2;
}
@Override
public boolean isActive() {
return state == 1;
@ -69,11 +62,6 @@ class EmbeddedChannel extends AbstractChannel {
return loop instanceof EmbeddedEventLoop;
}
@Override
protected java.nio.channels.Channel javaChannel() {
return javaChannel;
}
@Override
@SuppressWarnings("unchecked")
protected ChannelBufferHolder<Object> firstOut() {
@ -100,16 +88,6 @@ class EmbeddedChannel extends AbstractChannel {
// NOOP
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
return true;
}
@Override
protected void doFinishConnect() throws Exception {
// NOOP
}
@Override
protected void doDisconnect() throws Exception {
doClose();
@ -126,16 +104,37 @@ class EmbeddedChannel extends AbstractChannel {
}
@Override
protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception {
int length = buf.readableBytes();
if (length > 0) {
productQueue.add(buf.readBytes(length));
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
ChannelBuffer byteBuf = buf.byteBuffer();
int byteBufLen = byteBuf.readableBytes();
if (byteBufLen > 0) {
productQueue.add(byteBuf.readBytes(byteBufLen));
writeCounter += byteBufLen;
byteBuf.clear();
}
Queue<Object> msgBuf = buf.messageBuffer();
if (!msgBuf.isEmpty()) {
productQueue.addAll(msgBuf);
writeCounter += msgBuf.size();
msgBuf.clear();
}
return length;
}
@Override
protected boolean inEventLoopDrivenFlush() {
protected Unsafe newUnsafe() {
return new DefaultUnsafe();
}
@Override
protected boolean isFlushPending() {
return false;
}
private class DefaultUnsafe extends AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture future) {
future.setSuccess();
}
}
}

View File

@ -15,22 +15,17 @@
*/
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.DefaultAttributeMap;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* A skeletal {@link Channel} implementation.
@ -85,17 +80,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private volatile EventLoop eventLoop;
private volatile boolean registered;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelFuture connectFuture;
private ScheduledFuture<?> connectTimeoutFuture;
private ConnectException connectTimeoutException;
private long flushedAmount;
private final Deque<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
private ClosedChannelException closedChannelException;
private final Deque<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
protected long writeCounter;
/** Cache for the string representation of this channel */
private boolean strValActive;
@ -125,7 +112,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
this.parent = parent;
this.id = id;
unsafe = new DefaultUnsafe();
unsafe = newUnsafe();
closeFuture().addListener(new ChannelFutureListener() {
@Override
@ -194,11 +181,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
remoteAddress = null;
}
@Override
public boolean isOpen() {
return unsafe().ch().isOpen();
}
@Override
public boolean isRegistered() {
return registered;
@ -314,6 +296,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return unsafe;
}
protected abstract Unsafe newUnsafe();
/**
* Returns the {@linkplain System#identityHashCode(Object) identity hash code}
* of this channel.
@ -376,12 +360,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return strVal;
}
private class DefaultUnsafe implements Unsafe {
@Override
public java.nio.channels.Channel ch() {
return javaChannel();
}
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public ChannelBufferHolder<Object> out() {
@ -474,85 +453,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
@Override
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
if (!ensureOpen(future)) {
return;
}
try {
if (connectFuture != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
future.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} else {
connectFuture = future;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
if (connectTimeoutException == null) {
connectTimeoutException = new ConnectException("connection timed out");
}
ChannelFuture connectFuture = AbstractChannel.this.connectFuture;
if (connectFuture == null) {
return;
} else {
if (connectFuture.setFailure(connectTimeoutException)) {
pipeline().fireExceptionCaught(connectTimeoutException);
close(voidFuture());
}
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
}
} catch (Throwable t) {
future.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
connect(remoteAddress, localAddress, future);
}
});
}
}
@Override
public void finishConnect() {
assert eventLoop().inEventLoop();
assert connectFuture != null;
try {
boolean wasActive = isActive();
doFinishConnect();
connectFuture.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
connectFuture.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
} finally {
connectTimeoutFuture.cancel(false);
connectFuture = null;
}
}
@Override
public void disconnect(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
@ -636,67 +536,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelBufferHolder<Object> buf = pipeline().nextIn();
boolean closed = false;
boolean read = false;
try {
if (buf.hasMessageBuffer()) {
Queue<Object> msgBuf = buf.messageBuffer();
for (;;) {
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount == 0) {
break;
} else if (localReadAmount < 0) {
closed = true;
break;
}
}
} else {
ChannelBuffer byteBuf = buf.byteBuffer();
for (;;) {
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
break;
}
if (!expandReadBuffer(byteBuf)) {
break;
}
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidFuture());
}
}
}
@Override
public void flush(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
// Append flush future to the notification list.
if (future != voidFuture) {
long checkpoint = flushedAmount + out().size();
long checkpoint = writeCounter + out().size();
if (future instanceof FlushCheckpoint) {
FlushCheckpoint cp = (FlushCheckpoint) future;
cp.flushCheckpoint(checkpoint);
@ -709,10 +554,20 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
// Attempt/perform outbound I/O if:
// - the channel is inactive - flush0() will fail the futures.
// - the event loop has no plan to call flushForcibly().
if (!isActive() || !inEventLoopDrivenFlush()) {
// Note that we don't call flushForcibly() because otherwise its stack trace
// will be confusing.
flush0();
try {
if (!isActive() || !isFlushPending()) {
doFlush(out());
}
} catch (Throwable t) {
notifyFlushFutures(t);
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (!isActive()) {
close(unsafe().voidFuture());
}
}
} else {
eventLoop().execute(new Runnable() {
@ -724,115 +579,23 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
@Override
public void flushForcibly() {
flush0();
}
private void flush0() {
// Perform outbound I/O.
public void flushNow() {
try {
ChannelBufferHolder<Object> out = out();
if (out.hasByteBuffer()) {
flushByteBuf(out.byteBuffer());
} else {
flushMessageBuf(out.messageBuffer());
}
doFlush(out());
} catch (Throwable t) {
notifyFlushFutures(t);
pipeline().fireExceptionCaught(t);
close(voidFuture());
} finally {
if (!isActive()) {
if (t instanceof IOException) {
close(voidFuture());
}
}
}
private void flushByteBuf(ChannelBuffer buf) throws Exception {
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
return;
}
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf, i == 0);
if (localFlushedAmount > 0) {
flushedAmount += localFlushedAmount;
notifyFlushFutures();
break;
}
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
break;
} finally {
if (!isActive()) {
close(unsafe().voidFuture());
}
}
}
private void flushMessageBuf(Queue<Object> buf) throws Exception {
final int writeSpinCount = config().getWriteSpinCount() - 1;
while (!buf.isEmpty()) {
boolean wrote = false;
for (int i = writeSpinCount; i >= 0; i --) {
int localFlushedAmount = doWriteMessages(buf, i == 0);
if (localFlushedAmount > 0) {
flushedAmount += localFlushedAmount;
wrote = true;
notifyFlushFutures();
break;
}
}
if (!wrote) {
break;
}
}
}
private void notifyFlushFutures() {
if (flushCheckpoints.isEmpty()) {
return;
}
final long flushedAmount = AbstractChannel.this.flushedAmount;
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
if (cp.flushCheckpoint() > flushedAmount) {
break;
}
cp.future().setSuccess();
}
// Avoid overflow
if (flushCheckpoints.isEmpty()) {
// Reset the counter if there's nothing in the notification list.
AbstractChannel.this.flushedAmount = 0;
} else if (flushedAmount >= 0x1000000000000000L) {
// Otherwise, reset the counter only when the counter grew pretty large
// so that we can reduce the cost of updating all entries in the notification list.
AbstractChannel.this.flushedAmount = 0;
for (FlushCheckpoint cp: flushCheckpoints) {
cp.flushCheckpoint(cp.flushCheckpoint() - flushedAmount);
}
}
}
private void notifyFlushFutures(Throwable cause) {
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause);
}
}
private boolean ensureOpen(ChannelFuture future) {
protected boolean ensureOpen(ChannelFuture future) {
if (isOpen()) {
return true;
}
@ -843,7 +606,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return false;
}
private void closeIfClosed() {
protected void closeIfClosed() {
if (isOpen()) {
return;
}
@ -851,6 +614,63 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
protected abstract boolean isCompatible(EventLoop loop);
protected abstract ChannelBufferHolder<Object> firstOut();
protected abstract SocketAddress localAddress0();
protected abstract SocketAddress remoteAddress0();
protected abstract void doRegister() throws Exception;
protected abstract void doBind(SocketAddress localAddress) throws Exception;
protected abstract void doDisconnect() throws Exception;
protected abstract void doClose() throws Exception;
protected abstract void doDeregister() throws Exception;
protected abstract void doFlush(ChannelBufferHolder<Object> buf) throws Exception;
protected abstract boolean isFlushPending();
protected void notifyFlushFutures() {
if (flushCheckpoints.isEmpty()) {
return;
}
final long flushedAmount = AbstractChannel.this.writeCounter;
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
if (cp.flushCheckpoint() > flushedAmount) {
break;
}
cp.future().setSuccess();
}
// Avoid overflow
if (flushCheckpoints.isEmpty()) {
// Reset the counter if there's nothing in the notification list.
AbstractChannel.this.writeCounter = 0;
} else if (flushedAmount >= 0x1000000000000000L) {
// Otherwise, reset the counter only when the counter grew pretty large
// so that we can reduce the cost of updating all entries in the notification list.
AbstractChannel.this.writeCounter = 0;
for (FlushCheckpoint cp: flushCheckpoints) {
cp.flushCheckpoint(cp.flushCheckpoint() - flushedAmount);
}
}
}
protected void notifyFlushFutures(Throwable cause) {
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause);
}
}
static abstract class FlushCheckpoint {
abstract long flushCheckpoint();
abstract void flushCheckpoint(long checkpoint);
@ -904,48 +724,4 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return set;
}
}
protected abstract boolean isCompatible(EventLoop loop);
protected abstract java.nio.channels.Channel javaChannel();
protected abstract ChannelBufferHolder<Object> firstOut();
protected abstract SocketAddress localAddress0();
protected abstract SocketAddress remoteAddress0();
protected abstract void doRegister() throws Exception;
protected abstract void doBind(SocketAddress localAddress) throws Exception;
protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
protected abstract void doFinishConnect() throws Exception;
protected abstract void doDisconnect() throws Exception;
protected abstract void doClose() throws Exception;
protected abstract void doDeregister() throws Exception;
protected int doReadMessages(Queue<Object> buf) throws Exception {
throw new UnsupportedOperationException();
}
protected int doReadBytes(ChannelBuffer buf) throws Exception {
throw new UnsupportedOperationException();
}
protected int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
protected abstract boolean inEventLoopDrivenFlush();
private static boolean expandReadBuffer(ChannelBuffer byteBuf) {
if (!byteBuf.writable()) {
// FIXME: Use a sensible value.
byteBuf.ensureWritableBytes(4096);
return true;
}
return false;
}
}

View File

@ -16,9 +16,6 @@
package io.netty.channel;
import java.net.SocketAddress;
import java.util.AbstractQueue;
import java.util.Collections;
import java.util.Iterator;
/**
* A skeletal server-side {@link Channel} implementation. A server-side
@ -32,8 +29,6 @@ import java.util.Iterator;
*/
public abstract class AbstractServerChannel extends AbstractChannel implements ServerChannel {
private final ChannelBufferHolder<Object> out = ChannelBufferHolders.messageBuffer(new NoopQueue());
/**
* Creates a new instance.
*/
@ -43,7 +38,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
@Override
public ChannelBufferHolder<Object> out() {
return out;
return ChannelBufferHolders.discardBuffer();
}
@Override
@ -53,7 +48,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
@Override
protected ChannelBufferHolder<Object> firstOut() {
return out;
return ChannelBufferHolders.discardBuffer();
}
@Override
@ -61,50 +56,8 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
return null;
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doFinishConnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected boolean inEventLoopDrivenFlush() {
return false;
}
private static class NoopQueue extends AbstractQueue<Object> {
@Override
public boolean offer(Object e) {
return false;
}
@Override
public Object poll() {
return null;
}
@Override
public Object peek() {
return null;
}
@Override
public Iterator<Object> iterator() {
return Collections.emptyList().iterator();
}
@Override
public int size() {
return 0;
}
}
}

View File

@ -172,7 +172,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
Unsafe unsafe();
public interface Unsafe {
java.nio.channels.Channel ch();
ChannelBufferHolder<Object> out();
ChannelFuture voidFuture();
@ -182,13 +181,11 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
void register(EventLoop eventLoop, ChannelFuture future);
void bind(SocketAddress localAddress, ChannelFuture future);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
void finishConnect();
void disconnect(ChannelFuture future);
void close(ChannelFuture future);
void deregister(ChannelFuture future);
void read();
void flush(ChannelFuture future);
void flushForcibly();
void flushNow();
}
}

View File

@ -1,13 +1,29 @@
package io.netty.channel;
import io.netty.buffer.AbstractChannelBuffer;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers;
import io.netty.buffer.HeapChannelBufferFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.util.AbstractQueue;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
public final class ChannelBufferHolders {
private static final ChannelBufferHolder<Object> DISCARD_BUFFER =
new ChannelBufferHolder<Object>(new NoopQueue<Object>(), new NoopByteBuf());
public static <E> ChannelBufferHolder<E> messageBuffer() {
return messageBuffer(new ArrayDeque<E>());
}
@ -41,7 +57,202 @@ public final class ChannelBufferHolders {
return new ChannelBufferHolder<E>(msgBuf, byteBuf);
}
@SuppressWarnings("unchecked")
public static <E> ChannelBufferHolder<E> discardBuffer() {
return (ChannelBufferHolder<E>) DISCARD_BUFFER;
}
private ChannelBufferHolders() {
// Utility class
}
private static class NoopQueue<E> extends AbstractQueue<E> {
@Override
public boolean offer(Object e) {
return false;
}
@Override
public E poll() {
return null;
}
@Override
public E peek() {
return null;
}
@Override
public Iterator<E> iterator() {
return (Iterator<E>) Collections.emptyList().iterator();
}
@Override
public int size() {
return 0;
}
}
private static class NoopByteBuf extends AbstractChannelBuffer {
@Override
public ChannelBufferFactory factory() {
return HeapChannelBufferFactory.getInstance();
}
@Override
public int capacity() {
return 0;
}
@Override
public ByteOrder order() {
return ByteOrder.BIG_ENDIAN;
}
@Override
public boolean isDirect() {
return false;
}
@Override
public byte getByte(int index) {
return 0;
}
@Override
public short getShort(int index) {
return 0;
}
@Override
public int getUnsignedMedium(int index) {
return 0;
}
@Override
public int getInt(int index) {
return 0;
}
@Override
public long getLong(int index) {
return 0;
}
@Override
public void getBytes(int index, ChannelBuffer dst, int dstIndex, int length) {
// NOOP
}
@Override
public void getBytes(int index, byte[] dst, int dstIndex, int length) {
// NOOP
}
@Override
public void getBytes(int index, ByteBuffer dst) {
// NOOP
}
@Override
public void getBytes(int index, OutputStream out, int length)
throws IOException {
// NOOP
}
@Override
public int getBytes(int index, GatheringByteChannel out, int length)
throws IOException {
return 0;
}
@Override
public void setByte(int index, int value) {
// NOOP
}
@Override
public void setShort(int index, int value) {
// NOOP
}
@Override
public void setMedium(int index, int value) {
// NOOP
}
@Override
public void setInt(int index, int value) {
// NOOP
}
@Override
public void setLong(int index, long value) {
// NOOP
}
@Override
public void setBytes(int index, ChannelBuffer src, int srcIndex,
int length) {
// NOOP
}
@Override
public void setBytes(int index, byte[] src, int srcIndex, int length) {
// NOOP
}
@Override
public void setBytes(int index, ByteBuffer src) {
// NOOP
}
@Override
public int setBytes(int index, InputStream in, int length)
throws IOException {
return 0;
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length)
throws IOException {
return 0;
}
@Override
public ChannelBuffer copy(int index, int length) {
return this;
}
@Override
public ChannelBuffer slice(int index, int length) {
return this;
}
@Override
public ChannelBuffer duplicate() {
return this;
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
return ByteBuffer.allocate(0);
}
@Override
public boolean hasArray() {
return true;
}
@Override
public byte[] array() {
return new byte[0];
}
@Override
public int arrayOffset() {
return 0;
}
}
}

View File

@ -15,22 +15,68 @@
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public abstract class AbstractNioChannel extends AbstractChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AbstractNioChannel.class);
private final SelectableChannel ch;
private final int defaultInterestOps;
private volatile SelectionKey selectionKey;
protected AbstractNioChannel(Channel parent, Integer id, SelectableChannel ch) {
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelFuture connectFuture;
private ScheduledFuture<?> connectTimeoutFuture;
private ConnectException connectTimeoutException;
protected AbstractNioChannel(Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) {
super(parent, id);
this.ch = ch;
this.defaultInterestOps = defaultInterestOps;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
@Override
public boolean isOpen() {
return ch.isOpen();
}
@Override
@ -44,6 +90,15 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
@Override
public NioUnsafe unsafe() {
return (NioUnsafe) super.unsafe();
}
@Override
protected NioUnsafe newUnsafe() {
return new DefaultNioUnsafe();
}
protected SelectableChannel javaChannel() {
return ch;
}
@ -59,13 +114,249 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
@Override
protected void doRegister() throws Exception {
NioChildEventLoop loop = (NioChildEventLoop) eventLoop();
selectionKey = javaChannel().register(loop.selector, isActive()? SelectionKey.OP_READ : 0, this);
protected boolean isFlushPending() {
return (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
@Override
protected boolean inEventLoopDrivenFlush() {
return (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
protected void doRegister() throws Exception {
NioChildEventLoop loop = (NioChildEventLoop) eventLoop();
selectionKey = javaChannel().register(
loop.selector, isActive()? defaultInterestOps : 0, this);
}
@Override
protected void doDeregister() throws Exception {
selectionKey().cancel();
((NioChildEventLoop) eventLoop()).cancelledKeys ++;
}
protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
protected abstract void doFinishConnect() throws Exception;
protected int doReadMessages(Queue<Object> buf) throws Exception {
throw new UnsupportedOperationException();
}
protected int doReadBytes(ChannelBuffer buf) throws Exception {
throw new UnsupportedOperationException();
}
protected int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
public interface NioUnsafe extends Unsafe {
java.nio.channels.Channel ch();
void finishConnect();
void read();
}
private class DefaultNioUnsafe extends AbstractUnsafe implements NioUnsafe {
@Override
public java.nio.channels.Channel ch() {
return javaChannel();
}
@Override
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
if (!ensureOpen(future)) {
return;
}
try {
if (connectFuture != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
future.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} else {
connectFuture = future;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
if (connectTimeoutException == null) {
connectTimeoutException = new ConnectException("connection timed out");
}
ChannelFuture connectFuture = AbstractNioChannel.this.connectFuture;
if (connectFuture == null) {
return;
} else {
if (connectFuture.setFailure(connectTimeoutException)) {
pipeline().fireExceptionCaught(connectTimeoutException);
close(voidFuture());
}
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
}
} catch (Throwable t) {
future.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
connect(remoteAddress, localAddress, future);
}
});
}
}
@Override
public void finishConnect() {
assert eventLoop().inEventLoop();
assert connectFuture != null;
try {
boolean wasActive = isActive();
doFinishConnect();
connectFuture.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
connectFuture.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
} finally {
connectTimeoutFuture.cancel(false);
connectFuture = null;
}
}
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
boolean closed = false;
boolean read = false;
try {
if (buf.hasMessageBuffer()) {
Queue<Object> msgBuf = buf.messageBuffer();
for (;;) {
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount == 0) {
break;
} else if (localReadAmount < 0) {
closed = true;
break;
}
}
} else {
ChannelBuffer byteBuf = buf.byteBuffer();
for (;;) {
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
break;
}
if (!expandReadBuffer(byteBuf)) {
break;
}
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidFuture());
}
}
}
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
if (buf.hasByteBuffer()) {
flushByteBuf(buf.byteBuffer());
} else {
flushMessageBuf(buf.messageBuffer());
}
}
private void flushByteBuf(ChannelBuffer buf) throws Exception {
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
return;
}
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf, i == 0);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
break;
}
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
break;
}
}
}
private void flushMessageBuf(Queue<Object> buf) throws Exception {
final int writeSpinCount = config().getWriteSpinCount() - 1;
while (!buf.isEmpty()) {
boolean wrote = false;
for (int i = writeSpinCount; i >= 0; i --) {
int localFlushedAmount = doWriteMessages(buf, i == 0);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
wrote = true;
notifyFlushFutures();
break;
}
}
if (!wrote) {
break;
}
}
}
private static boolean expandReadBuffer(ChannelBuffer byteBuf) {
if (!byteBuf.writable()) {
// FIXME: Use a sensible value.
byteBuf.ensureWritableBytes(4096);
return true;
}
return false;
}
}

View File

@ -16,9 +16,9 @@
package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
import io.netty.channel.Channel.Unsafe;
import io.netty.channel.ChannelException;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -171,8 +171,8 @@ final class NioChildEventLoop extends SingleThreadEventLoop {
for (i = selectedKeys.iterator(); i.hasNext();) {
final SelectionKey k = i.next();
i.remove();
final Channel ch = (Channel) k.attachment();
final Unsafe unsafe = ch.unsafe();
final AbstractNioChannel ch = (AbstractNioChannel) k.attachment();
final NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
@ -183,7 +183,7 @@ final class NioChildEventLoop extends SingleThreadEventLoop {
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
unsafe.flushForcibly();
unsafe.flushNow();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
unsafe.finishConnect();

View File

@ -22,8 +22,6 @@ import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DetectionUtil;
import java.io.IOException;
@ -48,8 +46,6 @@ import java.util.Queue;
*/
public final class NioDatagramChannel extends AbstractNioChannel implements io.netty.channel.socket.DatagramChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioDatagramChannel.class);
private final DatagramChannelConfig config;
private final Map<InetAddress, List<MembershipKey>> memberships =
new HashMap<InetAddress, List<MembershipKey>>();
@ -72,25 +68,8 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
}
public NioDatagramChannel(Integer id, DatagramChannel socket) {
super(null, id, socket);
try {
socket.configureBlocking(false);
} catch (IOException e) {
try {
socket.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
super(null, id, socket, SelectionKey.OP_READ);
config = new NioDatagramChannelConfig(socket);
}
@Override
@ -165,12 +144,6 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
javaChannel().close();
}
@Override
protected void doDeregister() throws Exception {
selectionKey().cancel();
((NioChildEventLoop) eventLoop()).cancelledKeys ++;
}
@Override
protected int doReadMessages(Queue<Object> buf) throws Exception {
DatagramChannel ch = javaChannel();

View File

@ -15,13 +15,11 @@
*/
package io.netty.channel.socket.nio;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -30,43 +28,23 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.util.Queue;
public class NioServerSocketChannel extends AbstractServerChannel
public class NioServerSocketChannel extends AbstractNioChannel
implements io.netty.channel.socket.ServerSocketChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
private final ServerSocketChannel socket;
private final ServerSocketChannelConfig config;
private volatile SelectionKey selectionKey;
public NioServerSocketChannel() {
super(null);
private static ServerSocketChannel newSocket() {
try {
socket = ServerSocketChannel.open();
return ServerSocketChannel.open();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
try {
socket.configureBlocking(false);
} catch (IOException e) {
try {
socket.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
private final ServerSocketChannelConfig config;
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
config = new DefaultServerSocketChannelConfig(socket.socket());
public NioServerSocketChannel() {
super(null, null, newSocket(), SelectionKey.OP_ACCEPT);
config = new DefaultServerSocketChannelConfig(javaChannel().socket());
}
@Override
@ -79,11 +57,6 @@ public class NioServerSocketChannel extends AbstractServerChannel
return javaChannel().socket().isBound();
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return null;
@ -91,29 +64,18 @@ public class NioServerSocketChannel extends AbstractServerChannel
@Override
protected java.nio.channels.ServerSocketChannel javaChannel() {
return socket;
return (ServerSocketChannel) super.javaChannel();
}
@Override
protected SocketAddress localAddress0() {
return socket.socket().getLocalSocketAddress();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof NioChildEventLoop;
}
@Override
protected void doRegister() throws Exception {
NioChildEventLoop loop = (NioChildEventLoop) eventLoop();
selectionKey = javaChannel().register(
loop.selector, isActive()? SelectionKey.OP_ACCEPT : 0, this);
return javaChannel().socket().getLocalSocketAddress();
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress);
SelectionKey selectionKey = selectionKey();
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT);
}
@ -122,11 +84,6 @@ public class NioServerSocketChannel extends AbstractServerChannel
javaChannel().close();
}
@Override
protected void doDeregister() throws Exception {
selectionKey.cancel();
}
@Override
protected int doReadMessages(Queue<Object> buf) throws Exception {
java.nio.channels.SocketChannel ch = javaChannel().accept();
@ -136,4 +93,31 @@ public class NioServerSocketChannel extends AbstractServerChannel
buf.add(new NioSocketChannel(this, null, ch));
return 1;
}
// Unnecessary stuff
@Override
protected boolean doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doFinishConnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected ChannelBufferHolder<Object> firstOut() {
return ChannelBufferHolders.discardBuffer();
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
}

View File

@ -54,7 +54,7 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
}
public NioSocketChannel(Channel parent, Integer id, SocketChannel socket) {
super(parent, id, socket);
super(parent, id, socket, SelectionKey.OP_READ);
try {
socket.configureBlocking(false);
} catch (IOException e) {
@ -152,12 +152,6 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
javaChannel().close();
}
@Override
protected void doDeregister() throws Exception {
selectionKey().cancel();
((NioChildEventLoop) eventLoop()).cancelledKeys ++;
}
@Override
protected int doReadBytes(ChannelBuffer byteBuf) throws Exception {
return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());

View File

@ -0,0 +1,193 @@
package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Queue;
abstract class AbstractOioChannel extends AbstractChannel {
protected AbstractOioChannel(Channel parent, Integer id) {
super(parent, id);
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public OioUnsafe unsafe() {
return (OioUnsafe) super.unsafe();
}
@Override
protected OioUnsafe newUnsafe() {
return new DefaultOioUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof OioChildEventLoop;
}
@Override
protected void doRegister() throws Exception {
// NOOP
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected boolean isFlushPending() {
return false;
}
public interface OioUnsafe extends Unsafe {
void read();
}
private class DefaultOioUnsafe extends AbstractUnsafe implements OioUnsafe {
@Override
public void connect(
final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
if (!ensureOpen(future)) {
return;
}
try {
boolean wasActive = isActive();
doConnect(remoteAddress, localAddress);
future.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
future.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
connect(remoteAddress, localAddress, future);
}
});
}
}
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final ChannelBufferHolder<Object> buf = pipeline.nextIn();
boolean closed = false;
boolean read = false;
try {
if (buf.hasMessageBuffer()) {
Queue<Object> msgBuf = buf.messageBuffer();
int localReadAmount = doReadMessages(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
} else {
ChannelBuffer byteBuf = buf.byteBuffer();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidFuture());
}
}
}
}
protected abstract void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
protected int doReadMessages(Queue<Object> buf) throws Exception {
throw new UnsupportedOperationException();
}
protected int doReadBytes(ChannelBuffer buf) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
if (buf.hasByteBuffer()) {
flushByteBuf(buf.byteBuffer());
} else {
flushMessageBuf(buf.messageBuffer());
}
}
private void flushByteBuf(ChannelBuffer buf) throws Exception {
while (buf.readable()) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
}
}
buf.clear();
}
private void flushMessageBuf(Queue<Object> buf) throws Exception {
while (!buf.isEmpty()) {
int localFlushedAmount = doWriteMessages(buf);
if (localFlushedAmount > 0) {
writeCounter += localFlushedAmount;
notifyFlushFutures();
}
}
}
protected int doWriteMessages(Queue<Object> buf) throws Exception {
throw new UnsupportedOperationException();
}
protected int doWriteBytes(ChannelBuffer buf) throws Exception {
throw new UnsupportedOperationException();
}
}

View File

@ -9,7 +9,7 @@ import io.netty.channel.SingleThreadEventLoop;
class OioChildEventLoop extends SingleThreadEventLoop {
private final OioEventLoop parent;
private Channel ch;
private AbstractOioChannel ch;
OioChildEventLoop(OioEventLoop parent) {
super(parent.threadFactory);
@ -22,7 +22,7 @@ class OioChildEventLoop extends SingleThreadEventLoop {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ch = future.channel();
ch = (AbstractOioChannel) future.channel();
} else {
deregister();
}
@ -33,7 +33,7 @@ class OioChildEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
Channel ch = OioChildEventLoop.this.ch;
AbstractOioChannel ch = OioChildEventLoop.this.ch;
if (ch == null || !ch.isActive()) {
Runnable task;
try {

View File

@ -17,12 +17,10 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
@ -38,10 +36,9 @@ import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.Channel;
import java.util.Queue;
public class OioDatagramChannel extends AbstractChannel
public class OioDatagramChannel extends AbstractOioChannel
implements DatagramChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
@ -95,16 +92,6 @@ public class OioDatagramChannel extends AbstractChannel
return config;
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public boolean isOpen() {
return !socket.isClosed();
@ -115,16 +102,6 @@ public class OioDatagramChannel extends AbstractChannel
return isOpen() && socket.isBound();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof OioChildEventLoop;
}
@Override
protected Channel javaChannel() {
return null;
}
@Override
protected ChannelBufferHolder<Object> firstOut() {
return out;
@ -140,18 +117,13 @@ public class OioDatagramChannel extends AbstractChannel
return socket.getRemoteSocketAddress();
}
@Override
protected void doRegister() throws Exception {
// NOOP
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
socket.bind(localAddress);
}
@Override
protected boolean doConnect(SocketAddress remoteAddress,
protected void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) throws Exception {
if (localAddress != null) {
socket.bind(localAddress);
@ -161,7 +133,6 @@ public class OioDatagramChannel extends AbstractChannel
try {
socket.connect(remoteAddress);
success = true;
return true;
} finally {
if (!success) {
try {
@ -173,11 +144,6 @@ public class OioDatagramChannel extends AbstractChannel
}
}
@Override
protected void doFinishConnect() throws Exception {
throw new Error();
}
@Override
protected void doDisconnect() throws Exception {
socket.disconnect();
@ -188,11 +154,6 @@ public class OioDatagramChannel extends AbstractChannel
socket.close();
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected int doReadMessages(Queue<Object> buf) throws Exception {
int packetSize = config().getReceivePacketSize();
@ -214,7 +175,7 @@ public class OioDatagramChannel extends AbstractChannel
}
@Override
protected int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception {
protected int doWriteMessages(Queue<Object> buf) throws Exception {
DatagramPacket p = (DatagramPacket) buf.poll();
ChannelBuffer data = p.data();
int length = data.readableBytes();
@ -231,11 +192,6 @@ public class OioDatagramChannel extends AbstractChannel
return 1;
}
@Override
protected boolean inEventLoopDrivenFlush() {
return false;
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress) {
return joinGroup(multicastAddress, newFuture());

View File

@ -15,9 +15,9 @@
*/
package io.netty.channel.socket.oio;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelConfig;
@ -30,12 +30,11 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.Channel;
import java.util.Queue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class OioServerSocketChannel extends AbstractServerChannel
public class OioServerSocketChannel extends AbstractOioChannel
implements ServerSocketChannel {
private static final InternalLogger logger =
@ -62,7 +61,7 @@ public class OioServerSocketChannel extends AbstractServerChannel
}
public OioServerSocketChannel(Integer id, ServerSocket socket) {
super(id);
super(null, id);
if (socket == null) {
throw new NullPointerException("socket");
}
@ -96,14 +95,9 @@ public class OioServerSocketChannel extends AbstractServerChannel
return config;
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
return null;
}
@Override
@ -116,26 +110,11 @@ public class OioServerSocketChannel extends AbstractServerChannel
return isOpen() && socket.isBound();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof OioChildEventLoop;
}
@Override
protected Channel javaChannel() {
return null;
}
@Override
protected SocketAddress localAddress0() {
return socket.getLocalSocketAddress();
}
@Override
protected void doRegister() throws Exception {
// NOOP
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
socket.bind(localAddress);
@ -146,11 +125,6 @@ public class OioServerSocketChannel extends AbstractServerChannel
socket.close();
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected int doReadMessages(Queue<Object> buf) throws Exception {
if (socket.isClosed()) {
@ -179,4 +153,25 @@ public class OioServerSocketChannel extends AbstractServerChannel
return 0;
}
@Override
protected void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected ChannelBufferHolder<Object> firstOut() {
return ChannelBufferHolders.discardBuffer();
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
}

View File

@ -16,12 +16,10 @@
package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
@ -29,16 +27,14 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.io.PushbackInputStream;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.NotYetConnectedException;
import java.util.Queue;
public class OioSocketChannel extends AbstractChannel
public class OioSocketChannel extends AbstractOioChannel
implements SocketChannel {
private static final InternalLogger logger =
@ -47,7 +43,7 @@ public class OioSocketChannel extends AbstractChannel
private final Socket socket;
private final SocketChannelConfig config;
private final ChannelBufferHolder<?> out = ChannelBufferHolders.byteBuffer();
private InputStream is;
private PushbackInputStream is;
private OutputStream os;
public OioSocketChannel() {
@ -66,7 +62,7 @@ public class OioSocketChannel extends AbstractChannel
boolean success = false;
try {
if (socket.isConnected()) {
is = socket.getInputStream();
is = new PushbackInputStream(socket.getInputStream());
os = socket.getOutputStream();
}
socket.setSoTimeout(1000);
@ -89,16 +85,6 @@ public class OioSocketChannel extends AbstractChannel
return config;
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public boolean isOpen() {
return !socket.isClosed();
@ -109,16 +95,6 @@ public class OioSocketChannel extends AbstractChannel
return !socket.isClosed() && socket.isConnected();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof OioChildEventLoop;
}
@Override
protected java.nio.channels.Channel javaChannel() {
return null;
}
@Override
@SuppressWarnings("unchecked")
protected ChannelBufferHolder<Object> firstOut() {
@ -135,18 +111,13 @@ public class OioSocketChannel extends AbstractChannel
return socket.getRemoteSocketAddress();
}
@Override
protected void doRegister() throws Exception {
// NOOP
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
socket.bind(localAddress);
}
@Override
protected boolean doConnect(SocketAddress remoteAddress,
protected void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) throws Exception {
if (localAddress != null) {
socket.bind(localAddress);
@ -155,10 +126,9 @@ public class OioSocketChannel extends AbstractChannel
boolean success = false;
try {
socket.connect(remoteAddress, config().getConnectTimeoutMillis());
is = socket.getInputStream();
is = new PushbackInputStream(socket.getInputStream());
os = socket.getOutputStream();
success = true;
return true;
} finally {
if (!success) {
doClose();
@ -166,11 +136,6 @@ public class OioSocketChannel extends AbstractChannel
}
}
@Override
protected void doFinishConnect() throws Exception {
throw new Error();
}
@Override
protected void doDisconnect() throws Exception {
doClose();
@ -181,32 +146,29 @@ public class OioSocketChannel extends AbstractChannel
socket.close();
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected int doReadMessages(Queue<Object> buf) throws Exception {
throw new Error();
}
@Override
protected int doReadBytes(ChannelBuffer buf) throws Exception {
if (socket.isClosed()) {
return -1;
}
int b;
try {
int readBytes = buf.writeBytes(is, buf.writableBytes());
return readBytes;
b = is.read();
if (b < 0) {
return -1;
}
is.unread(b);
int available = is.available();
buf.ensureWritableBytes(available);
return buf.writeBytes(is, available);
} catch (SocketTimeoutException e) {
// Expected
return 0;
}
}
@Override
protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception {
protected int doWriteBytes(ChannelBuffer buf) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
@ -215,9 +177,4 @@ public class OioSocketChannel extends AbstractChannel
buf.readBytes(os, length);
return length;
}
@Override
protected boolean inEventLoopDrivenFlush() {
return false;
}
}