From 6206d82b2c670aa58537c686ce9144ba94201cf6 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sat, 26 May 2012 22:48:48 -0700 Subject: [PATCH] Split AbstractChannel into AbstractOioChannel and AbstractNioChannel - Simpler OIO transport - Suits better for other transports such as AIO, RXTX, IOStream - Add ChannelBufferHolders.discardBuffer() --- .../codec/embedder/EmbeddedChannel.java | 69 ++- .../io/netty/channel/AbstractChannel.java | 396 ++++-------------- .../netty/channel/AbstractServerChannel.java | 51 +-- .../main/java/io/netty/channel/Channel.java | 5 +- .../netty/channel/ChannelBufferHolders.java | 211 ++++++++++ .../socket/nio/AbstractNioChannel.java | 303 +++++++++++++- .../channel/socket/nio/NioChildEventLoop.java | 8 +- .../socket/nio/NioDatagramChannel.java | 29 +- .../socket/nio/NioServerSocketChannel.java | 96 ++--- .../channel/socket/nio/NioSocketChannel.java | 8 +- .../socket/oio/AbstractOioChannel.java | 193 +++++++++ .../channel/socket/oio/OioChildEventLoop.java | 6 +- .../socket/oio/OioDatagramChannel.java | 50 +-- .../socket/oio/OioServerSocketChannel.java | 57 ++- .../channel/socket/oio/OioSocketChannel.java | 77 +--- 15 files changed, 919 insertions(+), 640 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java b/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java index b6d1127bf9..5e1420c415 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java @@ -16,15 +16,14 @@ package io.netty.handler.codec.embedder; import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; import io.netty.channel.AbstractChannel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; -import java.io.IOException; import java.net.SocketAddress; import java.util.Queue; @@ -36,22 +35,11 @@ class EmbeddedChannel extends AbstractChannel { private final SocketAddress remoteAddress = new EmbeddedSocketAddress(); private final Queue productQueue; private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED - private final java.nio.channels.Channel javaChannel = new java.nio.channels.Channel() { - @Override - public boolean isOpen() { - return state < 2; - } - - @Override - public void close() throws IOException { - // NOOP - } - }; EmbeddedChannel(Queue productQueue) { super(null, null); this.productQueue = productQueue; - firstOut = ChannelBufferHolders.catchAllBuffer(productQueue, ChannelBuffers.dynamicBuffer()); + firstOut = ChannelBufferHolders.catchAllBuffer(); } @Override @@ -59,6 +47,11 @@ class EmbeddedChannel extends AbstractChannel { return config; } + @Override + public boolean isOpen() { + return state < 2; + } + @Override public boolean isActive() { return state == 1; @@ -69,11 +62,6 @@ class EmbeddedChannel extends AbstractChannel { return loop instanceof EmbeddedEventLoop; } - @Override - protected java.nio.channels.Channel javaChannel() { - return javaChannel; - } - @Override @SuppressWarnings("unchecked") protected ChannelBufferHolder firstOut() { @@ -100,16 +88,6 @@ class EmbeddedChannel extends AbstractChannel { // NOOP } - @Override - protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { - return true; - } - - @Override - protected void doFinishConnect() throws Exception { - // NOOP - } - @Override protected void doDisconnect() throws Exception { doClose(); @@ -126,16 +104,37 @@ class EmbeddedChannel extends AbstractChannel { } @Override - protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception { - int length = buf.readableBytes(); - if (length > 0) { - productQueue.add(buf.readBytes(length)); + protected void doFlush(ChannelBufferHolder buf) throws Exception { + ChannelBuffer byteBuf = buf.byteBuffer(); + int byteBufLen = byteBuf.readableBytes(); + if (byteBufLen > 0) { + productQueue.add(byteBuf.readBytes(byteBufLen)); + writeCounter += byteBufLen; + byteBuf.clear(); + } + Queue msgBuf = buf.messageBuffer(); + if (!msgBuf.isEmpty()) { + productQueue.addAll(msgBuf); + writeCounter += msgBuf.size(); + msgBuf.clear(); } - return length; } @Override - protected boolean inEventLoopDrivenFlush() { + protected Unsafe newUnsafe() { + return new DefaultUnsafe(); + } + + @Override + protected boolean isFlushPending() { return false; } + + private class DefaultUnsafe extends AbstractUnsafe { + @Override + public void connect(SocketAddress remoteAddress, + SocketAddress localAddress, ChannelFuture future) { + future.setSuccess(); + } + } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 1ac0347c04..b033d79835 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -15,22 +15,17 @@ */ package io.netty.channel; -import io.netty.buffer.ChannelBuffer; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.DefaultAttributeMap; import java.io.IOException; -import java.net.ConnectException; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Deque; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; /** * A skeletal {@link Channel} implementation. @@ -85,17 +80,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private volatile EventLoop eventLoop; private volatile boolean registered; - /** - * The future of the current connection attempt. If not null, subsequent - * connection attempts will fail. - */ - private ChannelFuture connectFuture; - private ScheduledFuture connectTimeoutFuture; - private ConnectException connectTimeoutException; - - private long flushedAmount; - private final Deque flushCheckpoints = new ArrayDeque(); private ClosedChannelException closedChannelException; + private final Deque flushCheckpoints = new ArrayDeque(); + protected long writeCounter; /** Cache for the string representation of this channel */ private boolean strValActive; @@ -125,7 +112,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.parent = parent; this.id = id; - unsafe = new DefaultUnsafe(); + unsafe = newUnsafe(); closeFuture().addListener(new ChannelFutureListener() { @Override @@ -194,11 +181,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha remoteAddress = null; } - @Override - public boolean isOpen() { - return unsafe().ch().isOpen(); - } - @Override public boolean isRegistered() { return registered; @@ -314,6 +296,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return unsafe; } + protected abstract Unsafe newUnsafe(); + /** * Returns the {@linkplain System#identityHashCode(Object) identity hash code} * of this channel. @@ -376,12 +360,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return strVal; } - private class DefaultUnsafe implements Unsafe { - - @Override - public java.nio.channels.Channel ch() { - return javaChannel(); - } + protected abstract class AbstractUnsafe implements Unsafe { @Override public ChannelBufferHolder out() { @@ -474,85 +453,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - @Override - public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture future) { - if (eventLoop().inEventLoop()) { - if (!ensureOpen(future)) { - return; - } - - try { - if (connectFuture != null) { - throw new IllegalStateException("connection attempt already made"); - } - - boolean wasActive = isActive(); - if (doConnect(remoteAddress, localAddress)) { - future.setSuccess(); - if (!wasActive && isActive()) { - pipeline().fireChannelActive(); - } - } else { - connectFuture = future; - - // Schedule connect timeout. - int connectTimeoutMillis = config().getConnectTimeoutMillis(); - if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(new Runnable() { - @Override - public void run() { - if (connectTimeoutException == null) { - connectTimeoutException = new ConnectException("connection timed out"); - } - ChannelFuture connectFuture = AbstractChannel.this.connectFuture; - if (connectFuture == null) { - return; - } else { - if (connectFuture.setFailure(connectTimeoutException)) { - pipeline().fireExceptionCaught(connectTimeoutException); - close(voidFuture()); - } - } - } - }, connectTimeoutMillis, TimeUnit.MILLISECONDS); - } - } - } catch (Throwable t) { - future.setFailure(t); - pipeline().fireExceptionCaught(t); - closeIfClosed(); - } - } else { - eventLoop().execute(new Runnable() { - @Override - public void run() { - connect(remoteAddress, localAddress, future); - } - }); - } - } - - @Override - public void finishConnect() { - assert eventLoop().inEventLoop(); - assert connectFuture != null; - try { - boolean wasActive = isActive(); - doFinishConnect(); - connectFuture.setSuccess(); - if (!wasActive && isActive()) { - pipeline().fireChannelActive(); - } - } catch (Throwable t) { - connectFuture.setFailure(t); - pipeline().fireExceptionCaught(t); - closeIfClosed(); - } finally { - connectTimeoutFuture.cancel(false); - connectFuture = null; - } - } - @Override public void disconnect(final ChannelFuture future) { if (eventLoop().inEventLoop()) { @@ -636,67 +536,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - @Override - public void read() { - assert eventLoop().inEventLoop(); - - final ChannelBufferHolder buf = pipeline().nextIn(); - boolean closed = false; - boolean read = false; - try { - if (buf.hasMessageBuffer()) { - Queue msgBuf = buf.messageBuffer(); - for (;;) { - int localReadAmount = doReadMessages(msgBuf); - if (localReadAmount > 0) { - read = true; - } else if (localReadAmount == 0) { - break; - } else if (localReadAmount < 0) { - closed = true; - break; - } - } - } else { - ChannelBuffer byteBuf = buf.byteBuffer(); - for (;;) { - int localReadAmount = doReadBytes(byteBuf); - if (localReadAmount > 0) { - read = true; - } else if (localReadAmount < 0) { - closed = true; - break; - } - if (!expandReadBuffer(byteBuf)) { - break; - } - } - } - } catch (Throwable t) { - if (read) { - read = false; - pipeline.fireInboundBufferUpdated(); - } - pipeline().fireExceptionCaught(t); - if (t instanceof IOException) { - close(voidFuture()); - } - } finally { - if (read) { - pipeline.fireInboundBufferUpdated(); - } - if (closed && isOpen()) { - close(voidFuture()); - } - } - } - @Override public void flush(final ChannelFuture future) { if (eventLoop().inEventLoop()) { // Append flush future to the notification list. if (future != voidFuture) { - long checkpoint = flushedAmount + out().size(); + long checkpoint = writeCounter + out().size(); if (future instanceof FlushCheckpoint) { FlushCheckpoint cp = (FlushCheckpoint) future; cp.flushCheckpoint(checkpoint); @@ -709,10 +554,20 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha // Attempt/perform outbound I/O if: // - the channel is inactive - flush0() will fail the futures. // - the event loop has no plan to call flushForcibly(). - if (!isActive() || !inEventLoopDrivenFlush()) { - // Note that we don't call flushForcibly() because otherwise its stack trace - // will be confusing. - flush0(); + try { + if (!isActive() || !isFlushPending()) { + doFlush(out()); + } + } catch (Throwable t) { + notifyFlushFutures(t); + pipeline().fireExceptionCaught(t); + if (t instanceof IOException) { + close(voidFuture()); + } + } finally { + if (!isActive()) { + close(unsafe().voidFuture()); + } } } else { eventLoop().execute(new Runnable() { @@ -724,115 +579,23 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - @Override - public void flushForcibly() { - flush0(); - } - - private void flush0() { - // Perform outbound I/O. + public void flushNow() { try { - ChannelBufferHolder out = out(); - if (out.hasByteBuffer()) { - flushByteBuf(out.byteBuffer()); - } else { - flushMessageBuf(out.messageBuffer()); - } + doFlush(out()); } catch (Throwable t) { notifyFlushFutures(t); pipeline().fireExceptionCaught(t); - close(voidFuture()); - } finally { - if (!isActive()) { + if (t instanceof IOException) { close(voidFuture()); } - } - } - - private void flushByteBuf(ChannelBuffer buf) throws Exception { - if (!buf.readable()) { - // Reset reader/writerIndex to 0 if the buffer is empty. - buf.clear(); - return; - } - - for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { - int localFlushedAmount = doWriteBytes(buf, i == 0); - if (localFlushedAmount > 0) { - flushedAmount += localFlushedAmount; - notifyFlushFutures(); - break; - } - if (!buf.readable()) { - // Reset reader/writerIndex to 0 if the buffer is empty. - buf.clear(); - break; + } finally { + if (!isActive()) { + close(unsafe().voidFuture()); } } } - private void flushMessageBuf(Queue buf) throws Exception { - final int writeSpinCount = config().getWriteSpinCount() - 1; - while (!buf.isEmpty()) { - boolean wrote = false; - for (int i = writeSpinCount; i >= 0; i --) { - int localFlushedAmount = doWriteMessages(buf, i == 0); - if (localFlushedAmount > 0) { - flushedAmount += localFlushedAmount; - wrote = true; - notifyFlushFutures(); - break; - } - } - - if (!wrote) { - break; - } - } - } - - private void notifyFlushFutures() { - if (flushCheckpoints.isEmpty()) { - return; - } - - final long flushedAmount = AbstractChannel.this.flushedAmount; - for (;;) { - FlushCheckpoint cp = flushCheckpoints.poll(); - if (cp == null) { - break; - } - if (cp.flushCheckpoint() > flushedAmount) { - break; - } - cp.future().setSuccess(); - } - - // Avoid overflow - if (flushCheckpoints.isEmpty()) { - // Reset the counter if there's nothing in the notification list. - AbstractChannel.this.flushedAmount = 0; - } else if (flushedAmount >= 0x1000000000000000L) { - // Otherwise, reset the counter only when the counter grew pretty large - // so that we can reduce the cost of updating all entries in the notification list. - AbstractChannel.this.flushedAmount = 0; - for (FlushCheckpoint cp: flushCheckpoints) { - cp.flushCheckpoint(cp.flushCheckpoint() - flushedAmount); - } - } - } - - private void notifyFlushFutures(Throwable cause) { - for (;;) { - FlushCheckpoint cp = flushCheckpoints.poll(); - if (cp == null) { - break; - } - cp.future().setFailure(cause); - } - } - - private boolean ensureOpen(ChannelFuture future) { + protected boolean ensureOpen(ChannelFuture future) { if (isOpen()) { return true; } @@ -843,7 +606,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return false; } - private void closeIfClosed() { + protected void closeIfClosed() { if (isOpen()) { return; } @@ -851,6 +614,63 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } + protected abstract boolean isCompatible(EventLoop loop); + + protected abstract ChannelBufferHolder firstOut(); + + protected abstract SocketAddress localAddress0(); + protected abstract SocketAddress remoteAddress0(); + + protected abstract void doRegister() throws Exception; + protected abstract void doBind(SocketAddress localAddress) throws Exception; + protected abstract void doDisconnect() throws Exception; + protected abstract void doClose() throws Exception; + protected abstract void doDeregister() throws Exception; + protected abstract void doFlush(ChannelBufferHolder buf) throws Exception; + + protected abstract boolean isFlushPending(); + + protected void notifyFlushFutures() { + if (flushCheckpoints.isEmpty()) { + return; + } + + final long flushedAmount = AbstractChannel.this.writeCounter; + for (;;) { + FlushCheckpoint cp = flushCheckpoints.poll(); + if (cp == null) { + break; + } + if (cp.flushCheckpoint() > flushedAmount) { + break; + } + cp.future().setSuccess(); + } + + // Avoid overflow + if (flushCheckpoints.isEmpty()) { + // Reset the counter if there's nothing in the notification list. + AbstractChannel.this.writeCounter = 0; + } else if (flushedAmount >= 0x1000000000000000L) { + // Otherwise, reset the counter only when the counter grew pretty large + // so that we can reduce the cost of updating all entries in the notification list. + AbstractChannel.this.writeCounter = 0; + for (FlushCheckpoint cp: flushCheckpoints) { + cp.flushCheckpoint(cp.flushCheckpoint() - flushedAmount); + } + } + } + + protected void notifyFlushFutures(Throwable cause) { + for (;;) { + FlushCheckpoint cp = flushCheckpoints.poll(); + if (cp == null) { + break; + } + cp.future().setFailure(cause); + } + } + static abstract class FlushCheckpoint { abstract long flushCheckpoint(); abstract void flushCheckpoint(long checkpoint); @@ -904,48 +724,4 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return set; } } - - protected abstract boolean isCompatible(EventLoop loop); - - protected abstract java.nio.channels.Channel javaChannel(); - protected abstract ChannelBufferHolder firstOut(); - - protected abstract SocketAddress localAddress0(); - protected abstract SocketAddress remoteAddress0(); - - protected abstract void doRegister() throws Exception; - protected abstract void doBind(SocketAddress localAddress) throws Exception; - protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; - protected abstract void doFinishConnect() throws Exception; - protected abstract void doDisconnect() throws Exception; - protected abstract void doClose() throws Exception; - protected abstract void doDeregister() throws Exception; - - protected int doReadMessages(Queue buf) throws Exception { - throw new UnsupportedOperationException(); - } - - protected int doReadBytes(ChannelBuffer buf) throws Exception { - throw new UnsupportedOperationException(); - } - - protected int doWriteMessages(Queue buf, boolean lastSpin) throws Exception { - throw new UnsupportedOperationException(); - } - - protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception { - throw new UnsupportedOperationException(); - } - - protected abstract boolean inEventLoopDrivenFlush(); - - private static boolean expandReadBuffer(ChannelBuffer byteBuf) { - if (!byteBuf.writable()) { - // FIXME: Use a sensible value. - byteBuf.ensureWritableBytes(4096); - return true; - } - - return false; - } } diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 7ba12cc8ca..d868b3ec5e 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -16,9 +16,6 @@ package io.netty.channel; import java.net.SocketAddress; -import java.util.AbstractQueue; -import java.util.Collections; -import java.util.Iterator; /** * A skeletal server-side {@link Channel} implementation. A server-side @@ -32,8 +29,6 @@ import java.util.Iterator; */ public abstract class AbstractServerChannel extends AbstractChannel implements ServerChannel { - private final ChannelBufferHolder out = ChannelBufferHolders.messageBuffer(new NoopQueue()); - /** * Creates a new instance. */ @@ -43,7 +38,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S @Override public ChannelBufferHolder out() { - return out; + return ChannelBufferHolders.discardBuffer(); } @Override @@ -53,7 +48,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S @Override protected ChannelBufferHolder firstOut() { - return out; + return ChannelBufferHolders.discardBuffer(); } @Override @@ -61,50 +56,8 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S return null; } - @Override - protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - protected void doFinishConnect() throws Exception { - throw new UnsupportedOperationException(); - } - @Override protected void doDisconnect() throws Exception { throw new UnsupportedOperationException(); } - - @Override - protected boolean inEventLoopDrivenFlush() { - return false; - } - - private static class NoopQueue extends AbstractQueue { - @Override - public boolean offer(Object e) { - return false; - } - - @Override - public Object poll() { - return null; - } - - @Override - public Object peek() { - return null; - } - - @Override - public Iterator iterator() { - return Collections.emptyList().iterator(); - } - - @Override - public int size() { - return 0; - } - } } diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 56ebb4e075..9d36ae35a0 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -172,7 +172,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu Unsafe unsafe(); public interface Unsafe { - java.nio.channels.Channel ch(); ChannelBufferHolder out(); ChannelFuture voidFuture(); @@ -182,13 +181,11 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu void register(EventLoop eventLoop, ChannelFuture future); void bind(SocketAddress localAddress, ChannelFuture future); void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); - void finishConnect(); void disconnect(ChannelFuture future); void close(ChannelFuture future); void deregister(ChannelFuture future); - void read(); void flush(ChannelFuture future); - void flushForcibly(); + void flushNow(); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java index 01a3726481..e4b97a6763 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolders.java @@ -1,13 +1,29 @@ package io.netty.channel; +import io.netty.buffer.AbstractChannelBuffer; import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBufferFactory; import io.netty.buffer.ChannelBuffers; +import io.netty.buffer.HeapChannelBufferFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.util.AbstractQueue; import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Iterator; import java.util.Queue; public final class ChannelBufferHolders { + private static final ChannelBufferHolder DISCARD_BUFFER = + new ChannelBufferHolder(new NoopQueue(), new NoopByteBuf()); + public static ChannelBufferHolder messageBuffer() { return messageBuffer(new ArrayDeque()); } @@ -41,7 +57,202 @@ public final class ChannelBufferHolders { return new ChannelBufferHolder(msgBuf, byteBuf); } + @SuppressWarnings("unchecked") + public static ChannelBufferHolder discardBuffer() { + return (ChannelBufferHolder) DISCARD_BUFFER; + } + private ChannelBufferHolders() { // Utility class } + + private static class NoopQueue extends AbstractQueue { + @Override + public boolean offer(Object e) { + return false; + } + + @Override + public E poll() { + return null; + } + + @Override + public E peek() { + return null; + } + + @Override + public Iterator iterator() { + return (Iterator) Collections.emptyList().iterator(); + } + + @Override + public int size() { + return 0; + } + } + + private static class NoopByteBuf extends AbstractChannelBuffer { + + @Override + public ChannelBufferFactory factory() { + return HeapChannelBufferFactory.getInstance(); + } + + @Override + public int capacity() { + return 0; + } + + @Override + public ByteOrder order() { + return ByteOrder.BIG_ENDIAN; + } + + @Override + public boolean isDirect() { + return false; + } + + @Override + public byte getByte(int index) { + return 0; + } + + @Override + public short getShort(int index) { + return 0; + } + + @Override + public int getUnsignedMedium(int index) { + return 0; + } + + @Override + public int getInt(int index) { + return 0; + } + + @Override + public long getLong(int index) { + return 0; + } + + @Override + public void getBytes(int index, ChannelBuffer dst, int dstIndex, int length) { + // NOOP + } + + @Override + public void getBytes(int index, byte[] dst, int dstIndex, int length) { + // NOOP + } + + @Override + public void getBytes(int index, ByteBuffer dst) { + // NOOP + } + + @Override + public void getBytes(int index, OutputStream out, int length) + throws IOException { + // NOOP + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) + throws IOException { + return 0; + } + + @Override + public void setByte(int index, int value) { + // NOOP + } + + @Override + public void setShort(int index, int value) { + // NOOP + } + + @Override + public void setMedium(int index, int value) { + // NOOP + } + + @Override + public void setInt(int index, int value) { + // NOOP + } + + @Override + public void setLong(int index, long value) { + // NOOP + } + + @Override + public void setBytes(int index, ChannelBuffer src, int srcIndex, + int length) { + // NOOP + } + + @Override + public void setBytes(int index, byte[] src, int srcIndex, int length) { + // NOOP + } + + @Override + public void setBytes(int index, ByteBuffer src) { + // NOOP + } + + @Override + public int setBytes(int index, InputStream in, int length) + throws IOException { + return 0; + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) + throws IOException { + return 0; + } + + @Override + public ChannelBuffer copy(int index, int length) { + return this; + } + + @Override + public ChannelBuffer slice(int index, int length) { + return this; + } + + @Override + public ChannelBuffer duplicate() { + return this; + } + + @Override + public ByteBuffer toByteBuffer(int index, int length) { + return ByteBuffer.allocate(0); + } + + @Override + public boolean hasArray() { + return true; + } + + @Override + public byte[] array() { + return new byte[0]; + } + + @Override + public int arrayOffset() { + return 0; + } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index b4442ef857..a71117a402 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -15,22 +15,68 @@ */ package io.netty.channel.socket.nio; +import io.netty.buffer.ChannelBuffer; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoop; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; +import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; +import java.util.Queue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; public abstract class AbstractNioChannel extends AbstractChannel { + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(AbstractNioChannel.class); + private final SelectableChannel ch; + private final int defaultInterestOps; private volatile SelectionKey selectionKey; - protected AbstractNioChannel(Channel parent, Integer id, SelectableChannel ch) { + /** + * The future of the current connection attempt. If not null, subsequent + * connection attempts will fail. + */ + private ChannelFuture connectFuture; + private ScheduledFuture connectTimeoutFuture; + private ConnectException connectTimeoutException; + + protected AbstractNioChannel(Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) { super(parent, id); this.ch = ch; + this.defaultInterestOps = defaultInterestOps; + try { + ch.configureBlocking(false); + } catch (IOException e) { + try { + ch.close(); + } catch (IOException e2) { + if (logger.isWarnEnabled()) { + logger.warn( + "Failed to close a partially initialized socket.", e2); + } + + } + + throw new ChannelException("Failed to enter non-blocking mode.", e); + } + } + + @Override + public boolean isOpen() { + return ch.isOpen(); } @Override @@ -44,6 +90,15 @@ public abstract class AbstractNioChannel extends AbstractChannel { } @Override + public NioUnsafe unsafe() { + return (NioUnsafe) super.unsafe(); + } + + @Override + protected NioUnsafe newUnsafe() { + return new DefaultNioUnsafe(); + } + protected SelectableChannel javaChannel() { return ch; } @@ -59,13 +114,249 @@ public abstract class AbstractNioChannel extends AbstractChannel { } @Override - protected void doRegister() throws Exception { - NioChildEventLoop loop = (NioChildEventLoop) eventLoop(); - selectionKey = javaChannel().register(loop.selector, isActive()? SelectionKey.OP_READ : 0, this); + protected boolean isFlushPending() { + return (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; } @Override - protected boolean inEventLoopDrivenFlush() { - return (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; + protected void doRegister() throws Exception { + NioChildEventLoop loop = (NioChildEventLoop) eventLoop(); + selectionKey = javaChannel().register( + loop.selector, isActive()? defaultInterestOps : 0, this); + } + + @Override + protected void doDeregister() throws Exception { + selectionKey().cancel(); + ((NioChildEventLoop) eventLoop()).cancelledKeys ++; + } + + protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; + + protected abstract void doFinishConnect() throws Exception; + + protected int doReadMessages(Queue buf) throws Exception { + throw new UnsupportedOperationException(); + } + + protected int doReadBytes(ChannelBuffer buf) throws Exception { + throw new UnsupportedOperationException(); + } + + protected int doWriteMessages(Queue buf, boolean lastSpin) throws Exception { + throw new UnsupportedOperationException(); + } + + protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception { + throw new UnsupportedOperationException(); + } + + public interface NioUnsafe extends Unsafe { + java.nio.channels.Channel ch(); + void finishConnect(); + void read(); + } + + private class DefaultNioUnsafe extends AbstractUnsafe implements NioUnsafe { + @Override + public java.nio.channels.Channel ch() { + return javaChannel(); + } + + @Override + public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture future) { + if (eventLoop().inEventLoop()) { + if (!ensureOpen(future)) { + return; + } + + try { + if (connectFuture != null) { + throw new IllegalStateException("connection attempt already made"); + } + + boolean wasActive = isActive(); + if (doConnect(remoteAddress, localAddress)) { + future.setSuccess(); + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } + } else { + connectFuture = future; + + // Schedule connect timeout. + int connectTimeoutMillis = config().getConnectTimeoutMillis(); + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { + @Override + public void run() { + if (connectTimeoutException == null) { + connectTimeoutException = new ConnectException("connection timed out"); + } + ChannelFuture connectFuture = AbstractNioChannel.this.connectFuture; + if (connectFuture == null) { + return; + } else { + if (connectFuture.setFailure(connectTimeoutException)) { + pipeline().fireExceptionCaught(connectTimeoutException); + close(voidFuture()); + } + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } + } + } catch (Throwable t) { + future.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } + } else { + eventLoop().execute(new Runnable() { + @Override + public void run() { + connect(remoteAddress, localAddress, future); + } + }); + } + } + + @Override + public void finishConnect() { + assert eventLoop().inEventLoop(); + assert connectFuture != null; + try { + boolean wasActive = isActive(); + doFinishConnect(); + connectFuture.setSuccess(); + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } + } catch (Throwable t) { + connectFuture.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } finally { + connectTimeoutFuture.cancel(false); + connectFuture = null; + } + } + + @Override + public void read() { + assert eventLoop().inEventLoop(); + + final ChannelPipeline pipeline = pipeline(); + final ChannelBufferHolder buf = pipeline.nextIn(); + boolean closed = false; + boolean read = false; + try { + if (buf.hasMessageBuffer()) { + Queue msgBuf = buf.messageBuffer(); + for (;;) { + int localReadAmount = doReadMessages(msgBuf); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount == 0) { + break; + } else if (localReadAmount < 0) { + closed = true; + break; + } + } + } else { + ChannelBuffer byteBuf = buf.byteBuffer(); + for (;;) { + int localReadAmount = doReadBytes(byteBuf); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount < 0) { + closed = true; + break; + } + if (!expandReadBuffer(byteBuf)) { + break; + } + } + } + } catch (Throwable t) { + if (read) { + read = false; + pipeline.fireInboundBufferUpdated(); + } + pipeline().fireExceptionCaught(t); + if (t instanceof IOException) { + close(voidFuture()); + } + } finally { + if (read) { + pipeline.fireInboundBufferUpdated(); + } + if (closed && isOpen()) { + close(voidFuture()); + } + } + } + } + + @Override + protected void doFlush(ChannelBufferHolder buf) throws Exception { + if (buf.hasByteBuffer()) { + flushByteBuf(buf.byteBuffer()); + } else { + flushMessageBuf(buf.messageBuffer()); + } + } + + private void flushByteBuf(ChannelBuffer buf) throws Exception { + if (!buf.readable()) { + // Reset reader/writerIndex to 0 if the buffer is empty. + buf.clear(); + return; + } + + for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { + int localFlushedAmount = doWriteBytes(buf, i == 0); + if (localFlushedAmount > 0) { + writeCounter += localFlushedAmount; + notifyFlushFutures(); + break; + } + if (!buf.readable()) { + // Reset reader/writerIndex to 0 if the buffer is empty. + buf.clear(); + break; + } + } + } + + private void flushMessageBuf(Queue buf) throws Exception { + final int writeSpinCount = config().getWriteSpinCount() - 1; + while (!buf.isEmpty()) { + boolean wrote = false; + for (int i = writeSpinCount; i >= 0; i --) { + int localFlushedAmount = doWriteMessages(buf, i == 0); + if (localFlushedAmount > 0) { + writeCounter += localFlushedAmount; + wrote = true; + notifyFlushFutures(); + break; + } + } + + if (!wrote) { + break; + } + } + } + + private static boolean expandReadBuffer(ChannelBuffer byteBuf) { + if (!byteBuf.writable()) { + // FIXME: Use a sensible value. + byteBuf.ensureWritableBytes(4096); + return true; + } + + return false; } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java index 03c47ebaa7..8d4c3a36f7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java @@ -16,9 +16,9 @@ package io.netty.channel.socket.nio; import io.netty.channel.Channel; -import io.netty.channel.Channel.Unsafe; import io.netty.channel.ChannelException; import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -171,8 +171,8 @@ final class NioChildEventLoop extends SingleThreadEventLoop { for (i = selectedKeys.iterator(); i.hasNext();) { final SelectionKey k = i.next(); i.remove(); - final Channel ch = (Channel) k.attachment(); - final Unsafe unsafe = ch.unsafe(); + final AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); + final NioUnsafe unsafe = ch.unsafe(); try { int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { @@ -183,7 +183,7 @@ final class NioChildEventLoop extends SingleThreadEventLoop { } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { - unsafe.flushForcibly(); + unsafe.flushNow(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { unsafe.finishConnect(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 54c87211fd..0b80c40b77 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -22,8 +22,6 @@ import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramPacket; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DetectionUtil; import java.io.IOException; @@ -48,8 +46,6 @@ import java.util.Queue; */ public final class NioDatagramChannel extends AbstractNioChannel implements io.netty.channel.socket.DatagramChannel { - private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioDatagramChannel.class); - private final DatagramChannelConfig config; private final Map> memberships = new HashMap>(); @@ -72,25 +68,8 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n } public NioDatagramChannel(Integer id, DatagramChannel socket) { - super(null, id, socket); - try { - socket.configureBlocking(false); - } catch (IOException e) { - try { - socket.close(); - } catch (IOException e2) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a partially initialized socket.", e2); - } - - } - - throw new ChannelException("Failed to enter non-blocking mode.", e); - } - + super(null, id, socket, SelectionKey.OP_READ); config = new NioDatagramChannelConfig(socket); - } @Override @@ -165,12 +144,6 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n javaChannel().close(); } - @Override - protected void doDeregister() throws Exception { - selectionKey().cancel(); - ((NioChildEventLoop) eventLoop()).cancelledKeys ++; - } - @Override protected int doReadMessages(Queue buf) throws Exception { DatagramChannel ch = javaChannel(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index b604c7c69b..1a795cd747 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -15,13 +15,11 @@ */ package io.netty.channel.socket.nio; -import io.netty.channel.AbstractServerChannel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; -import io.netty.channel.EventLoop; import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; @@ -30,43 +28,23 @@ import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.util.Queue; -public class NioServerSocketChannel extends AbstractServerChannel +public class NioServerSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.ServerSocketChannel { - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioServerSocketChannel.class); - - private final ServerSocketChannel socket; - private final ServerSocketChannelConfig config; - private volatile SelectionKey selectionKey; - - public NioServerSocketChannel() { - super(null); - + private static ServerSocketChannel newSocket() { try { - socket = ServerSocketChannel.open(); + return ServerSocketChannel.open(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } + } - try { - socket.configureBlocking(false); - } catch (IOException e) { - try { - socket.close(); - } catch (IOException e2) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a partially initialized socket.", e2); - } + private final ServerSocketChannelConfig config; - } - - throw new ChannelException("Failed to enter non-blocking mode.", e); - } - - config = new DefaultServerSocketChannelConfig(socket.socket()); + public NioServerSocketChannel() { + super(null, null, newSocket(), SelectionKey.OP_ACCEPT); + config = new DefaultServerSocketChannelConfig(javaChannel().socket()); } @Override @@ -79,11 +57,6 @@ public class NioServerSocketChannel extends AbstractServerChannel return javaChannel().socket().isBound(); } - @Override - public InetSocketAddress localAddress() { - return (InetSocketAddress) super.localAddress(); - } - @Override public InetSocketAddress remoteAddress() { return null; @@ -91,29 +64,18 @@ public class NioServerSocketChannel extends AbstractServerChannel @Override protected java.nio.channels.ServerSocketChannel javaChannel() { - return socket; + return (ServerSocketChannel) super.javaChannel(); } @Override protected SocketAddress localAddress0() { - return socket.socket().getLocalSocketAddress(); - } - - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof NioChildEventLoop; - } - - @Override - protected void doRegister() throws Exception { - NioChildEventLoop loop = (NioChildEventLoop) eventLoop(); - selectionKey = javaChannel().register( - loop.selector, isActive()? SelectionKey.OP_ACCEPT : 0, this); + return javaChannel().socket().getLocalSocketAddress(); } @Override protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress); + SelectionKey selectionKey = selectionKey(); selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT); } @@ -122,11 +84,6 @@ public class NioServerSocketChannel extends AbstractServerChannel javaChannel().close(); } - @Override - protected void doDeregister() throws Exception { - selectionKey.cancel(); - } - @Override protected int doReadMessages(Queue buf) throws Exception { java.nio.channels.SocketChannel ch = javaChannel().accept(); @@ -136,4 +93,31 @@ public class NioServerSocketChannel extends AbstractServerChannel buf.add(new NioSocketChannel(this, null, ch)); return 1; } + + // Unnecessary stuff + @Override + protected boolean doConnect( + SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doFinishConnect() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected ChannelBufferHolder firstOut() { + return ChannelBufferHolders.discardBuffer(); + } + + @Override + protected SocketAddress remoteAddress0() { + return null; + } + + @Override + protected void doDisconnect() throws Exception { + throw new UnsupportedOperationException(); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index b051e9cf6d..14b63240f5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -54,7 +54,7 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha } public NioSocketChannel(Channel parent, Integer id, SocketChannel socket) { - super(parent, id, socket); + super(parent, id, socket, SelectionKey.OP_READ); try { socket.configureBlocking(false); } catch (IOException e) { @@ -152,12 +152,6 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha javaChannel().close(); } - @Override - protected void doDeregister() throws Exception { - selectionKey().cancel(); - ((NioChildEventLoop) eventLoop()).cancelledKeys ++; - } - @Override protected int doReadBytes(ChannelBuffer byteBuf) throws Exception { return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java new file mode 100644 index 0000000000..c46809e0a6 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java @@ -0,0 +1,193 @@ +package io.netty.channel.socket.oio; + +import io.netty.buffer.ChannelBuffer; +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoop; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Queue; + +abstract class AbstractOioChannel extends AbstractChannel { + + protected AbstractOioChannel(Channel parent, Integer id) { + super(parent, id); + } + + @Override + public InetSocketAddress localAddress() { + return (InetSocketAddress) super.localAddress(); + } + + @Override + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) super.remoteAddress(); + } + + @Override + public OioUnsafe unsafe() { + return (OioUnsafe) super.unsafe(); + } + + @Override + protected OioUnsafe newUnsafe() { + return new DefaultOioUnsafe(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return loop instanceof OioChildEventLoop; + } + + @Override + protected void doRegister() throws Exception { + // NOOP + } + + @Override + protected void doDeregister() throws Exception { + // NOOP + } + + @Override + protected boolean isFlushPending() { + return false; + } + + public interface OioUnsafe extends Unsafe { + void read(); + } + + private class DefaultOioUnsafe extends AbstractUnsafe implements OioUnsafe { + + @Override + public void connect( + final SocketAddress remoteAddress, + final SocketAddress localAddress, final ChannelFuture future) { + if (eventLoop().inEventLoop()) { + if (!ensureOpen(future)) { + return; + } + + try { + boolean wasActive = isActive(); + doConnect(remoteAddress, localAddress); + future.setSuccess(); + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } + } catch (Throwable t) { + future.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } + } else { + eventLoop().execute(new Runnable() { + @Override + public void run() { + connect(remoteAddress, localAddress, future); + } + }); + } + } + + @Override + public void read() { + assert eventLoop().inEventLoop(); + + final ChannelPipeline pipeline = pipeline(); + final ChannelBufferHolder buf = pipeline.nextIn(); + boolean closed = false; + boolean read = false; + try { + if (buf.hasMessageBuffer()) { + Queue msgBuf = buf.messageBuffer(); + int localReadAmount = doReadMessages(msgBuf); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount < 0) { + closed = true; + } + } else { + ChannelBuffer byteBuf = buf.byteBuffer(); + int localReadAmount = doReadBytes(byteBuf); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount < 0) { + closed = true; + } + } + } catch (Throwable t) { + if (read) { + read = false; + pipeline.fireInboundBufferUpdated(); + } + pipeline().fireExceptionCaught(t); + if (t instanceof IOException) { + close(voidFuture()); + } + } finally { + if (read) { + pipeline.fireInboundBufferUpdated(); + } + if (closed && isOpen()) { + close(voidFuture()); + } + } + } + } + + protected abstract void doConnect( + SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; + + protected int doReadMessages(Queue buf) throws Exception { + throw new UnsupportedOperationException(); + } + + protected int doReadBytes(ChannelBuffer buf) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void doFlush(ChannelBufferHolder buf) throws Exception { + if (buf.hasByteBuffer()) { + flushByteBuf(buf.byteBuffer()); + } else { + flushMessageBuf(buf.messageBuffer()); + } + } + + private void flushByteBuf(ChannelBuffer buf) throws Exception { + while (buf.readable()) { + int localFlushedAmount = doWriteBytes(buf); + if (localFlushedAmount > 0) { + writeCounter += localFlushedAmount; + notifyFlushFutures(); + } + } + buf.clear(); + } + + private void flushMessageBuf(Queue buf) throws Exception { + while (!buf.isEmpty()) { + int localFlushedAmount = doWriteMessages(buf); + if (localFlushedAmount > 0) { + writeCounter += localFlushedAmount; + notifyFlushFutures(); + } + } + } + + protected int doWriteMessages(Queue buf) throws Exception { + throw new UnsupportedOperationException(); + } + + protected int doWriteBytes(ChannelBuffer buf) throws Exception { + throw new UnsupportedOperationException(); + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java index 357c135427..8cfb527155 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java @@ -9,7 +9,7 @@ import io.netty.channel.SingleThreadEventLoop; class OioChildEventLoop extends SingleThreadEventLoop { private final OioEventLoop parent; - private Channel ch; + private AbstractOioChannel ch; OioChildEventLoop(OioEventLoop parent) { super(parent.threadFactory); @@ -22,7 +22,7 @@ class OioChildEventLoop extends SingleThreadEventLoop { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - ch = future.channel(); + ch = (AbstractOioChannel) future.channel(); } else { deregister(); } @@ -33,7 +33,7 @@ class OioChildEventLoop extends SingleThreadEventLoop { @Override protected void run() { for (;;) { - Channel ch = OioChildEventLoop.this.ch; + AbstractOioChannel ch = OioChildEventLoop.this.ch; if (ch == null || !ch.isActive()) { Runnable task; try { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index d074ad452f..8ea48b7153 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -17,12 +17,10 @@ package io.netty.channel.socket.oio; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.AbstractChannel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; -import io.netty.channel.EventLoop; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramPacket; @@ -38,10 +36,9 @@ import java.net.NetworkInterface; import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketTimeoutException; -import java.nio.channels.Channel; import java.util.Queue; -public class OioDatagramChannel extends AbstractChannel +public class OioDatagramChannel extends AbstractOioChannel implements DatagramChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class); @@ -95,16 +92,6 @@ public class OioDatagramChannel extends AbstractChannel return config; } - @Override - public InetSocketAddress localAddress() { - return (InetSocketAddress) super.localAddress(); - } - - @Override - public InetSocketAddress remoteAddress() { - return (InetSocketAddress) super.remoteAddress(); - } - @Override public boolean isOpen() { return !socket.isClosed(); @@ -115,16 +102,6 @@ public class OioDatagramChannel extends AbstractChannel return isOpen() && socket.isBound(); } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof OioChildEventLoop; - } - - @Override - protected Channel javaChannel() { - return null; - } - @Override protected ChannelBufferHolder firstOut() { return out; @@ -140,18 +117,13 @@ public class OioDatagramChannel extends AbstractChannel return socket.getRemoteSocketAddress(); } - @Override - protected void doRegister() throws Exception { - // NOOP - } - @Override protected void doBind(SocketAddress localAddress) throws Exception { socket.bind(localAddress); } @Override - protected boolean doConnect(SocketAddress remoteAddress, + protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { socket.bind(localAddress); @@ -161,7 +133,6 @@ public class OioDatagramChannel extends AbstractChannel try { socket.connect(remoteAddress); success = true; - return true; } finally { if (!success) { try { @@ -173,11 +144,6 @@ public class OioDatagramChannel extends AbstractChannel } } - @Override - protected void doFinishConnect() throws Exception { - throw new Error(); - } - @Override protected void doDisconnect() throws Exception { socket.disconnect(); @@ -188,11 +154,6 @@ public class OioDatagramChannel extends AbstractChannel socket.close(); } - @Override - protected void doDeregister() throws Exception { - // NOOP - } - @Override protected int doReadMessages(Queue buf) throws Exception { int packetSize = config().getReceivePacketSize(); @@ -214,7 +175,7 @@ public class OioDatagramChannel extends AbstractChannel } @Override - protected int doWriteMessages(Queue buf, boolean lastSpin) throws Exception { + protected int doWriteMessages(Queue buf) throws Exception { DatagramPacket p = (DatagramPacket) buf.poll(); ChannelBuffer data = p.data(); int length = data.readableBytes(); @@ -231,11 +192,6 @@ public class OioDatagramChannel extends AbstractChannel return 1; } - @Override - protected boolean inEventLoopDrivenFlush() { - return false; - } - @Override public ChannelFuture joinGroup(InetAddress multicastAddress) { return joinGroup(multicastAddress, newFuture()); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index 136be6c661..b095930ce9 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -15,9 +15,9 @@ */ package io.netty.channel.socket.oio; -import io.netty.channel.AbstractServerChannel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; -import io.netty.channel.EventLoop; import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannelConfig; @@ -30,12 +30,11 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.net.SocketTimeoutException; -import java.nio.channels.Channel; import java.util.Queue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public class OioServerSocketChannel extends AbstractServerChannel +public class OioServerSocketChannel extends AbstractOioChannel implements ServerSocketChannel { private static final InternalLogger logger = @@ -62,7 +61,7 @@ public class OioServerSocketChannel extends AbstractServerChannel } public OioServerSocketChannel(Integer id, ServerSocket socket) { - super(id); + super(null, id); if (socket == null) { throw new NullPointerException("socket"); } @@ -96,14 +95,9 @@ public class OioServerSocketChannel extends AbstractServerChannel return config; } - @Override - public InetSocketAddress localAddress() { - return (InetSocketAddress) super.localAddress(); - } - @Override public InetSocketAddress remoteAddress() { - return (InetSocketAddress) super.remoteAddress(); + return null; } @Override @@ -116,26 +110,11 @@ public class OioServerSocketChannel extends AbstractServerChannel return isOpen() && socket.isBound(); } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof OioChildEventLoop; - } - - @Override - protected Channel javaChannel() { - return null; - } - @Override protected SocketAddress localAddress0() { return socket.getLocalSocketAddress(); } - @Override - protected void doRegister() throws Exception { - // NOOP - } - @Override protected void doBind(SocketAddress localAddress) throws Exception { socket.bind(localAddress); @@ -146,11 +125,6 @@ public class OioServerSocketChannel extends AbstractServerChannel socket.close(); } - @Override - protected void doDeregister() throws Exception { - // NOOP - } - @Override protected int doReadMessages(Queue buf) throws Exception { if (socket.isClosed()) { @@ -179,4 +153,25 @@ public class OioServerSocketChannel extends AbstractServerChannel return 0; } + + @Override + protected void doConnect( + SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected ChannelBufferHolder firstOut() { + return ChannelBufferHolders.discardBuffer(); + } + + @Override + protected SocketAddress remoteAddress0() { + return null; + } + + @Override + protected void doDisconnect() throws Exception { + throw new UnsupportedOperationException(); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index 1f1755502a..bd682e8e7b 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -16,12 +16,10 @@ package io.netty.channel.socket.oio; import io.netty.buffer.ChannelBuffer; -import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; -import io.netty.channel.EventLoop; import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannelConfig; @@ -29,16 +27,14 @@ import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; -import java.net.InetSocketAddress; +import java.io.PushbackInputStream; import java.net.Socket; import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.nio.channels.NotYetConnectedException; -import java.util.Queue; -public class OioSocketChannel extends AbstractChannel +public class OioSocketChannel extends AbstractOioChannel implements SocketChannel { private static final InternalLogger logger = @@ -47,7 +43,7 @@ public class OioSocketChannel extends AbstractChannel private final Socket socket; private final SocketChannelConfig config; private final ChannelBufferHolder out = ChannelBufferHolders.byteBuffer(); - private InputStream is; + private PushbackInputStream is; private OutputStream os; public OioSocketChannel() { @@ -66,7 +62,7 @@ public class OioSocketChannel extends AbstractChannel boolean success = false; try { if (socket.isConnected()) { - is = socket.getInputStream(); + is = new PushbackInputStream(socket.getInputStream()); os = socket.getOutputStream(); } socket.setSoTimeout(1000); @@ -89,16 +85,6 @@ public class OioSocketChannel extends AbstractChannel return config; } - @Override - public InetSocketAddress localAddress() { - return (InetSocketAddress) super.localAddress(); - } - - @Override - public InetSocketAddress remoteAddress() { - return (InetSocketAddress) super.remoteAddress(); - } - @Override public boolean isOpen() { return !socket.isClosed(); @@ -109,16 +95,6 @@ public class OioSocketChannel extends AbstractChannel return !socket.isClosed() && socket.isConnected(); } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof OioChildEventLoop; - } - - @Override - protected java.nio.channels.Channel javaChannel() { - return null; - } - @Override @SuppressWarnings("unchecked") protected ChannelBufferHolder firstOut() { @@ -135,18 +111,13 @@ public class OioSocketChannel extends AbstractChannel return socket.getRemoteSocketAddress(); } - @Override - protected void doRegister() throws Exception { - // NOOP - } - @Override protected void doBind(SocketAddress localAddress) throws Exception { socket.bind(localAddress); } @Override - protected boolean doConnect(SocketAddress remoteAddress, + protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { socket.bind(localAddress); @@ -155,10 +126,9 @@ public class OioSocketChannel extends AbstractChannel boolean success = false; try { socket.connect(remoteAddress, config().getConnectTimeoutMillis()); - is = socket.getInputStream(); + is = new PushbackInputStream(socket.getInputStream()); os = socket.getOutputStream(); success = true; - return true; } finally { if (!success) { doClose(); @@ -166,11 +136,6 @@ public class OioSocketChannel extends AbstractChannel } } - @Override - protected void doFinishConnect() throws Exception { - throw new Error(); - } - @Override protected void doDisconnect() throws Exception { doClose(); @@ -181,32 +146,29 @@ public class OioSocketChannel extends AbstractChannel socket.close(); } - @Override - protected void doDeregister() throws Exception { - // NOOP - } - - @Override - protected int doReadMessages(Queue buf) throws Exception { - throw new Error(); - } - @Override protected int doReadBytes(ChannelBuffer buf) throws Exception { if (socket.isClosed()) { return -1; } + int b; try { - int readBytes = buf.writeBytes(is, buf.writableBytes()); - return readBytes; + b = is.read(); + if (b < 0) { + return -1; + } + is.unread(b); + + int available = is.available(); + buf.ensureWritableBytes(available); + return buf.writeBytes(is, available); } catch (SocketTimeoutException e) { - // Expected return 0; } } @Override - protected int doWriteBytes(ChannelBuffer buf, boolean lastSpin) throws Exception { + protected int doWriteBytes(ChannelBuffer buf) throws Exception { OutputStream os = this.os; if (os == null) { throw new NotYetConnectedException(); @@ -215,9 +177,4 @@ public class OioSocketChannel extends AbstractChannel buf.readBytes(os, length); return length; } - - @Override - protected boolean inEventLoopDrivenFlush() { - return false; - } }