diff --git a/buffer/src/main/java/io/netty/buffer/AbstractChannelBuffer.java b/buffer/src/main/java/io/netty/buffer/AbstractChannelBuffer.java index 76789fae35..7a47743302 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractChannelBuffer.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractChannelBuffer.java @@ -76,7 +76,7 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer { @Override public boolean readable() { - return readableBytes() > 0; + return writerIndex > readerIndex; } @Override @@ -119,6 +119,12 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer { if (readerIndex == 0) { return; } + + if (readerIndex == writerIndex) { + clear(); + return; + } + setBytes(0, this, readerIndex, writerIndex - readerIndex); writerIndex -= readerIndex; markedReaderIndex = Math.max(markedReaderIndex - readerIndex, 0); @@ -132,7 +138,7 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer { throw new IndexOutOfBoundsException(); } } - + @Override public boolean getBoolean(int index) { return getByte(index) != 0; @@ -195,7 +201,7 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer { getBytes(index, dst, dst.writerIndex(), length); dst.writerIndex(dst.writerIndex() + length); } - + @Override public void setBoolean(int index, boolean value) { setByte(index, value ? 1 : 0); @@ -267,7 +273,7 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer { } } } - + @Override public byte readByte() { if (readerIndex == writerIndex) { @@ -275,7 +281,7 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer { } return getByte(readerIndex ++); } - + @Override public boolean readBoolean() { return readByte() != 0; @@ -436,7 +442,7 @@ public abstract class AbstractChannelBuffer implements ChannelBuffer { } readerIndex = newReaderIndex; } - + @Override public void writeBoolean(boolean value) { writeByte(value ? 1 : 0); diff --git a/example/src/main/java/io/netty/example/echo/EchoServer.java b/example/src/main/java/io/netty/example/echo/EchoServer.java index 4c0ee1ed0c..c1cabcbffc 100644 --- a/example/src/main/java/io/netty/example/echo/EchoServer.java +++ b/example/src/main/java/io/netty/example/echo/EchoServer.java @@ -15,14 +15,22 @@ */ package io.netty.example.echo; -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.EventLoop; +import io.netty.channel.MultithreadEventLoop; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.SelectorEventLoop; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.channel.Channels; -import io.netty.channel.socket.nio.NioServerSocketChannelFactory; +import java.net.InetSocketAddress; +import java.util.ArrayDeque; +import java.util.Queue; /** * Echoes back any received data from a client. @@ -35,21 +43,52 @@ public class EchoServer { this.port = port; } - public void run() { + public void run() throws Exception { // Configure the server. - ServerBootstrap bootstrap = new ServerBootstrap( - new NioServerSocketChannelFactory( - Executors.newCachedThreadPool())); + final EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.class); + ServerSocketChannel ssc = new NioServerSocketChannel(); + ssc.pipeline().addLast("acceptor", new ChannelInboundHandlerAdapter() { - // Set up the pipeline factory. - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(new EchoServerHandler()); + @Override + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) + throws Exception { + return ChannelBufferHolders.messageBuffer(new ArrayDeque()); + } + + @Override + public void inboundBufferUpdated( + ChannelInboundHandlerContext ctx) + throws Exception { + Queue in = ctx.in().messageBuffer(); + for (;;) { + SocketChannel s = in.poll(); + if (s == null) { + break; + } + s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter() { + @Override + public ChannelBufferHolder newInboundBuffer(ChannelInboundHandlerContext ctx) { + return ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer()); + } + + @Override + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) { + ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer out = ctx.out().byteBuffer(); + out.discardReadBytes(); + out.writeBytes(in); + in.clear(); + ctx.flush(); + } + }); + loop.register(s); + } } }); - // Bind and start to accept incoming connections. - bootstrap.bind(new InetSocketAddress(port)); + loop.register(ssc).awaitUninterruptibly().rethrowIfFailed(); + ssc.bind(new InetSocketAddress(port), ssc.newFuture()); } public static void main(String[] args) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 8fe62ea711..f416845b5d 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -20,8 +20,8 @@ import io.netty.logging.InternalLoggerFactory; import io.netty.util.DefaultAttributeMap; import io.netty.util.internal.ConcurrentHashMap; -import java.io.IOException; import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -35,9 +35,19 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha static final ConcurrentMap allChannels = new ConcurrentHashMap(); + /** + * Generates a negative unique integer ID. This method generates only + * negative integers to avoid conflicts with user-specified IDs where only + * non-negative integers are allowed. + */ private static Integer allocateId(Channel channel) { - Integer id = Integer.valueOf(System.identityHashCode(channel)); + int idVal = -Math.abs(System.identityHashCode(channel)); + if (idVal >= 0) { + idVal = -1; + } + Integer id; for (;;) { + id = Integer.valueOf(idVal); // Loop until a unique ID is acquired. // It should be found in one loop practically. if (allChannels.putIfAbsent(id, channel) == null) { @@ -45,22 +55,32 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return id; } else { // Taken by other channel at almost the same moment. - id = Integer.valueOf(id.intValue() + 1); + idVal --; + if (idVal >= 0) { + idVal = -1; + } } } } - private final Integer id; private final Channel parent; + private final Integer id; private final Unsafe unsafe; private final ChannelPipeline pipeline = new DefaultChannelPipeline(this); private final List closureListeners = new ArrayList(4); private final ChannelFuture succeededFuture = new SucceededChannelFuture(this); + private final ChannelFuture voidFuture = new VoidChannelFuture(this); private volatile EventLoop eventLoop; private volatile boolean registered; private volatile boolean notifiedClosureListeners; + + /** + * The future of the current connection attempt. If not null, subsequent + * connection attempts will fail. + */ private ChannelFuture connectFuture; + private long writtenAmount; /** Cache for the string representation of this channel */ private boolean strValActive; @@ -69,12 +89,27 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha /** * Creates a new instance. * + * @param id + * the unique non-negative integer ID of this channel. + * Specify {@code null} to auto-generate a unique negative integer + * ID. * @param parent * the parent of this channel. {@code null} if there's no parent. */ - protected AbstractChannel(Channel parent) { - id = allocateId(this); + protected AbstractChannel(Channel parent, Integer id) { + if (id == null) { + id = allocateId(this); + } else { + if (id.intValue() < 0) { + throw new IllegalArgumentException("id: " + id + " (expected: >= 0)"); + } + if (allChannels.putIfAbsent(id, this) != null) { + throw new IllegalArgumentException("duplicate ID: " + id); + } + } + this.parent = parent; + this.id = id; unsafe = new DefaultUnsafe(); closureListeners.add(new ChannelFutureListener() { @@ -85,19 +120,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha }); } - /** - * (Internal use only) Creates a new temporary instance with the specified - * ID. - * - * @param parent - * the parent of this channel. {@code null} if there's no parent. - */ - protected AbstractChannel(Integer id, Channel parent) { - this.id = id; - this.parent = parent; - unsafe = new DefaultUnsafe(); - } - @Override public final Integer id() { return id; @@ -115,61 +137,123 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public EventLoop eventLoop() { + if (eventLoop == null) { + throw new IllegalStateException("channel not registered to an event loop"); + } return eventLoop; } + @Override + public boolean isOpen() { + return unsafe().ch().isOpen(); + } + @Override public boolean isRegistered() { return registered; } @Override - public void bind(SocketAddress localAddress, ChannelFuture future) { - pipeline().bind(localAddress, future); + public ChannelFuture bind(SocketAddress localAddress) { + ChannelFuture f = newFuture(); + pipeline().bind(localAddress, f); + return f; } @Override - public void connect(SocketAddress remoteAddress, ChannelFuture future) { - pipeline().connect(remoteAddress, future); + public ChannelFuture connect(SocketAddress remoteAddress) { + ChannelFuture f = newFuture(); + pipeline().connect(remoteAddress, f); + return f; } @Override - public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { - pipeline().connect(remoteAddress, localAddress, future); + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + ChannelFuture f = newFuture(); + pipeline().connect(remoteAddress, localAddress, f); + return f; } @Override - public void disconnect(ChannelFuture future) { - pipeline().disconnect(future); + public ChannelFuture disconnect() { + ChannelFuture f = newFuture(); + pipeline().disconnect(f); + return f; } @Override - public void close(ChannelFuture future) { - pipeline().close(future); + public ChannelFuture close() { + ChannelFuture f = newFuture(); + pipeline().close(f); + return f; } @Override - public void deregister(ChannelFuture future) { - pipeline().deregister(future); + public ChannelFuture deregister() { + ChannelFuture f = newFuture(); + pipeline().deregister(f); + return f; + } + + @Override + public ChannelFuture flush() { + ChannelFuture f = newFuture(); + pipeline().flush(f); + return f; + } + + @Override + public ChannelFuture write(Object message) { + ChannelFuture f = newFuture(); + pipeline().write(message, f); + return f; + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) { + return pipeline().bind(localAddress, future); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future) { + return pipeline().connect(remoteAddress, future); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { + return pipeline().connect(remoteAddress, localAddress, future); + } + + @Override + public ChannelFuture disconnect(ChannelFuture future) { + return pipeline().disconnect(future); + } + + @Override + public ChannelFuture close(ChannelFuture future) { + return pipeline().close(future); + } + + @Override + public ChannelFuture deregister(ChannelFuture future) { + return pipeline().deregister(future); } @Override public ChannelBufferHolder out() { - return pipeline().nextOut(); + return pipeline().out(); } @Override - public void flush(ChannelFuture future) { - pipeline().flush(future); + public ChannelFuture flush(ChannelFuture future) { + return pipeline().flush(future); } @Override - public void write(Object message, ChannelFuture future) { - pipeline.write(message, future); + public ChannelFuture write(Object message, ChannelFuture future) { + return pipeline.write(message, future); } - - @Override public ChannelFuture newFuture() { return new DefaultChannelFuture(this, false); @@ -185,6 +269,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return new FailedChannelFuture(this, cause); } + @Override + public ChannelFuture newVoidFuture() { + return voidFuture; + } @Override public void addClosureListener(final ChannelFutureListener listener) { @@ -370,7 +458,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public SocketAddress remoteAddress() { - // TODO Auto-generated method stub return remoteAddress0(); } @@ -385,22 +472,53 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha AbstractChannel.this.eventLoop = eventLoop; assert eventLoop().inEventLoop(); - doRegister(future); - assert future.isDone(); - if (registered = future.isSuccess()) { + + if (!ensureOpen(future)) { + return; + } + + try { + doRegister(); + registered = true; + future.setSuccess(); pipeline().fireChannelRegistered(); + } catch (Throwable t) { + // Close the channel directly to avoid FD leak. + try { + doClose(); + } catch (Throwable t2) { + logger.warn("Failed to close a channel", t2); + } + + future.setFailure(t); + pipeline().fireExceptionCaught(t); } } @Override public void bind(final SocketAddress localAddress, final ChannelFuture future) { if (eventLoop().inEventLoop()) { - doBind(localAddress, future); + if (!ensureOpen(future)) { + return; + } + + try { + boolean wasActive = isActive(); + doBind(localAddress); + future.setSuccess(); + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } + } catch (Throwable t) { + future.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } } else { eventLoop().execute(new Runnable() { @Override public void run() { - doBind(localAddress, future); + bind(localAddress, future); } }); } @@ -408,20 +526,35 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture future) { - // XXX: What if a user makes a connection attempt twice? if (eventLoop().inEventLoop()) { - doConnect(remoteAddress, localAddress, future); - if (!future.isDone()) { - connectFuture = future; + if (!ensureOpen(future)) { + return; + } + + try { + if (connectFuture != null) { + throw new IllegalStateException("connection attempt already made"); + } + + boolean wasActive = isActive(); + if (doConnect(remoteAddress, localAddress)) { + future.setSuccess(); + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } + } else { + connectFuture = future; + } + } catch (Throwable t) { + future.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); } } else { eventLoop().execute(new Runnable() { @Override public void run() { - doConnect(remoteAddress, localAddress, future); - if (!future.isDone()) { - connectFuture = future; - } + connect(remoteAddress, localAddress, future); } }); } @@ -431,18 +564,37 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha public void finishConnect() { assert eventLoop().inEventLoop(); assert connectFuture != null; - doFinishConnect(connectFuture); + try { + doFinishConnect(); + connectFuture.setSuccess(); + } catch (Throwable t) { + connectFuture.setFailure(t); + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } finally { + connectFuture = null; + } } @Override public void disconnect(final ChannelFuture future) { if (eventLoop().inEventLoop()) { - doDisconnect(future); + try { + boolean wasActive = isActive(); + doDisconnect(); + future.setSuccess(); + if (wasActive && !isActive()) { + pipeline().fireChannelInactive(); + } + } catch (Throwable t) { + future.setFailure(t); + closeIfClosed(); + } } else { eventLoop().execute(new Runnable() { @Override public void run() { - doDisconnect(future); + disconnect(future); } }); } @@ -451,14 +603,27 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public void close(final ChannelFuture future) { if (eventLoop().inEventLoop()) { - doClose(future); - notifyClosureListeners(); + if (isOpen()) { + boolean wasActive = isActive(); + try { + doClose(); + future.setSuccess(); + } catch (Throwable t) { + future.setFailure(t); + } + if (wasActive && !isActive()) { + pipeline().fireChannelInactive(); + } + notifyClosureListeners(); + } else { + // Closed already. + future.setSuccess(); + } } else { eventLoop().execute(new Runnable() { @Override public void run() { - doClose(future); - notifyClosureListeners(); + close(future); } }); } @@ -468,47 +633,99 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha public void deregister(final ChannelFuture future) { if (eventLoop().inEventLoop()) { try { - doDeregister(future); + doDeregister(); + } catch (Throwable t) { + logger.warn("Unexpected exception occurred while deregistering a channel.", t); } finally { + future.setSuccess(); registered = false; pipeline().fireChannelUnregistered(); eventLoop = null; } + } else { + eventLoop().execute(new Runnable() { + @Override + public void run() { + deregister(future); + } + }); + } + } + + @Override + public void read() { + assert eventLoop().inEventLoop(); + // FIXME: Wrap with a loop + long readAmount = 0; + try { + boolean closeIfClosed = false; + for (;;) { + int localReadAmount = doRead(); + if (localReadAmount > 0) { + readAmount += localReadAmount; + continue; + } + if (localReadAmount == 0) { + break; + } + if (localReadAmount < 0) { + closeIfClosed = true; + break; + } + } + + if (readAmount > 0) { + pipeline.fireInboundBufferUpdated(); + } + + if (closeIfClosed) { + closeIfClosed(); + } + } catch (Throwable t) { + pipeline().fireExceptionCaught(t); + closeIfClosed(); + } + } + + @Override + public void flush(final ChannelFuture future) { + // FIXME: Notify future properly using writtenAmount. + if (eventLoop().inEventLoop()) { + try { + writtenAmount += doFlush(); + } catch (Exception e) { + future.setFailure(e); + } } else { eventLoop().execute(new Runnable() { @Override public void run() { try { - doDeregister(future); - } finally { - registered = false; - pipeline().fireChannelUnregistered(); - eventLoop = null; + writtenAmount += doFlush(); + } catch (Exception e) { + future.setFailure(e); } } }); } } - @Override - public int read() throws IOException { - assert eventLoop().inEventLoop(); - return doRead(); + private boolean ensureOpen(ChannelFuture future) { + if (isOpen()) { + return true; + } + + Exception e = new ClosedChannelException(); + future.setFailure(e); + pipeline().fireExceptionCaught(e); + return false; } - @Override - public int flush(final ChannelFuture future) { - if (eventLoop().inEventLoop()) { - return doFlush(future); - } else { - eventLoop().execute(new Runnable() { - @Override - public void run() { - doFlush(future); - } - }); - return -1; // Unknown + private void closeIfClosed() { + if (isOpen()) { + return; } + close(newFuture()); } } @@ -518,14 +735,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract SocketAddress localAddress0(); protected abstract SocketAddress remoteAddress0(); - protected abstract void doRegister(ChannelFuture future); - protected abstract void doBind(SocketAddress localAddress, ChannelFuture future); - protected abstract void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); - protected abstract void doFinishConnect(ChannelFuture future); - protected abstract void doDisconnect(ChannelFuture future); - protected abstract void doClose(ChannelFuture future); - protected abstract void doDeregister(ChannelFuture future); + protected abstract void doRegister() throws Exception; + protected abstract void doBind(SocketAddress localAddress) throws Exception; + protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; + protected abstract void doFinishConnect() throws Exception; + protected abstract void doDisconnect() throws Exception; + protected abstract void doClose() throws Exception; + protected abstract void doDeregister() throws Exception; - protected abstract int doRead(); - protected abstract int doFlush(ChannelFuture future); + protected abstract int doRead() throws Exception; + protected abstract int doFlush() throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 8c9a48d541..ccc9f1b9fc 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -37,8 +37,8 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S /** * Creates a new instance. */ - protected AbstractServerChannel() { - super(null); + protected AbstractServerChannel(Integer id) { + super(null, id); } @Override @@ -62,24 +62,23 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } @Override - protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { - future.setFailure(new UnsupportedOperationException()); + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + throw new UnsupportedOperationException(); } @Override - protected void doFinishConnect(ChannelFuture future) { - future.setFailure(new UnsupportedOperationException()); + protected void doFinishConnect() throws Exception { + throw new UnsupportedOperationException(); } @Override - protected void doDisconnect(ChannelFuture future) { - future.setFailure(new UnsupportedOperationException()); + protected void doDisconnect() throws Exception { + throw new UnsupportedOperationException(); } @Override - protected int doFlush(ChannelFuture future) { - future.setFailure(new UnsupportedOperationException()); - return 0; + protected int doFlush() throws Exception { + throw new UnsupportedOperationException(); } private static class NoopQueue extends AbstractQueue { diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index b867a97fce..ba9e6b1b80 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -21,7 +21,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannelConfig; import io.netty.util.AttributeMap; -import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.SelectionKey; @@ -106,7 +105,7 @@ import java.nio.channels.SelectionKey; * * @apiviz.exclude ^io\.netty\.channel\.([a-z]+\.)+[^\.]+Channel$ */ -public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable { +public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFutureFactory, Comparable { /** * Returns the unique integer ID of this channel. @@ -134,6 +133,7 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable< */ ChannelPipeline pipeline(); + boolean isOpen(); boolean isRegistered(); boolean isActive(); @@ -164,17 +164,6 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable< */ SocketAddress remoteAddress(); - void bind(SocketAddress localAddress, ChannelFuture future); - void connect(SocketAddress remoteAddress, ChannelFuture future); - void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); - void disconnect(ChannelFuture future); - void close(ChannelFuture future); - void deregister(ChannelFuture future); - - ChannelBufferHolder out(); - void flush(ChannelFuture future); - void write(Object message, ChannelFuture future); - // FIXME: Introduce more flexible channel state notification mechanism // - notify me when channel becomes (un)registered, (in)active void addClosureListener(ChannelFutureListener listener); @@ -197,7 +186,7 @@ public interface Channel extends AttributeMap, ChannelFutureFactory, Comparable< void close(ChannelFuture future); void deregister(ChannelFuture future); - int read() throws IOException; - int flush(ChannelFuture future); + void read(); + void flush(ChannelFuture future); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelFutureFactory.java b/transport/src/main/java/io/netty/channel/ChannelFutureFactory.java index c07bc03d8d..857b0623f4 100644 --- a/transport/src/main/java/io/netty/channel/ChannelFutureFactory.java +++ b/transport/src/main/java/io/netty/channel/ChannelFutureFactory.java @@ -4,4 +4,5 @@ public interface ChannelFutureFactory { ChannelFuture newFuture(); ChannelFuture newSucceededFuture(); ChannelFuture newFailedFuture(Throwable cause); + ChannelFuture newVoidFuture(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index db914adb70..8d222cf258 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -114,9 +114,9 @@ public class ChannelHandlerAdapter implements ChannelInboundHandler, Ch @Override public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - if (ctx.out().hasMessageBuffer()) { - Queue out = ctx.out().messageBuffer(); - Queue nextOut = ctx.nextOut().messageBuffer(); + if (ctx.prevOut().hasMessageBuffer()) { + Queue out = ctx.prevOut().messageBuffer(); + Queue nextOut = ctx.out().messageBuffer(); for (;;) { O msg = out.poll(); if (msg == null) { @@ -125,8 +125,8 @@ public class ChannelHandlerAdapter implements ChannelInboundHandler, Ch nextOut.add(msg); } } else { - ChannelBuffer out = ctx.out().byteBuffer(); - ChannelBuffer nextOut = ctx.nextOut().byteBuffer(); + ChannelBuffer out = ctx.prevOut().byteBuffer(); + ChannelBuffer nextOut = ctx.out().byteBuffer(); nextOut.writeBytes(out); } ctx.flush(future); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index 2ab0e83085..581d4ac832 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -122,7 +122,9 @@ import java.nio.channels.Channels; * pipeline, and how to handle the event in your application. * @apiviz.owns io.netty.channel.ChannelHandler */ -public interface ChannelHandlerContext extends AttributeMap, ChannelHandlerInvoker, ChannelFutureFactory { +public interface ChannelHandlerContext + extends AttributeMap, ChannelFutureFactory, + ChannelInboundInvoker, ChannelOutboundInvoker { Channel channel(); ChannelPipeline pipeline(); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java b/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java deleted file mode 100644 index 23d610f399..0000000000 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java +++ /dev/null @@ -1,26 +0,0 @@ -package io.netty.channel; - -import java.net.SocketAddress; - -public interface ChannelHandlerInvoker { - ChannelBufferHolder nextIn(); - ChannelBufferHolder nextOut(); - - void fireChannelRegistered(); - void fireChannelUnregistered(); - void fireChannelActive(); - void fireChannelInactive(); - void fireExceptionCaught(Throwable cause); - void fireUserEventTriggered(Object event); - void fireInboundBufferUpdated(); - - void bind(SocketAddress localAddress, ChannelFuture future); - void connect(SocketAddress remoteAddress, ChannelFuture future); - void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); - void disconnect(ChannelFuture future); - void close(ChannelFuture future); - void deregister(ChannelFuture future); - void flush(ChannelFuture future); - - void write(Object message, ChannelFuture future); -} diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java new file mode 100644 index 0000000000..993b80ad91 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java @@ -0,0 +1,14 @@ +package io.netty.channel; + + +public interface ChannelInboundInvoker { + ChannelBufferHolder nextIn(); + + void fireChannelRegistered(); + void fireChannelUnregistered(); + void fireChannelActive(); + void fireChannelInactive(); + void fireExceptionCaught(Throwable cause); + void fireUserEventTriggered(Object event); + void fireInboundBufferUpdated(); +} diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java index f4f46378b7..2a7a8d78a3 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java @@ -59,9 +59,9 @@ public class ChannelOutboundHandlerAdapter implements ChannelOutboundHandler< @Override public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - if (ctx.out().hasMessageBuffer()) { - Queue out = ctx.out().messageBuffer(); - Queue nextOut = ctx.nextOut().messageBuffer(); + if (ctx.prevOut().hasMessageBuffer()) { + Queue out = ctx.prevOut().messageBuffer(); + Queue nextOut = ctx.out().messageBuffer(); for (;;) { O msg = out.poll(); if (msg == null) { @@ -70,8 +70,8 @@ public class ChannelOutboundHandlerAdapter implements ChannelOutboundHandler< nextOut.add(msg); } } else { - ChannelBuffer out = ctx.out().byteBuffer(); - ChannelBuffer nextOut = ctx.nextOut().byteBuffer(); + ChannelBuffer out = ctx.prevOut().byteBuffer(); + ChannelBuffer nextOut = ctx.out().byteBuffer(); nextOut.writeBytes(out); } ctx.flush(future); diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java index de8ec5d7fa..5b0471fbf8 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerContext.java @@ -2,5 +2,5 @@ package io.netty.channel; public interface ChannelOutboundHandlerContext extends ChannelHandlerContext { - ChannelBufferHolder out(); + ChannelBufferHolder prevOut(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java new file mode 100644 index 0000000000..9e5eb33fae --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java @@ -0,0 +1,25 @@ +package io.netty.channel; + +import java.net.SocketAddress; + +public interface ChannelOutboundInvoker { + ChannelBufferHolder out(); + + ChannelFuture bind(SocketAddress localAddress); + ChannelFuture connect(SocketAddress remoteAddress); + ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress); + ChannelFuture disconnect(); + ChannelFuture close(); + ChannelFuture deregister(); + ChannelFuture flush(); + ChannelFuture write(Object message); + + ChannelFuture bind(SocketAddress localAddress, ChannelFuture future); + ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future); + ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future); + ChannelFuture disconnect(ChannelFuture future); + ChannelFuture close(ChannelFuture future); + ChannelFuture deregister(ChannelFuture future); + ChannelFuture flush(ChannelFuture future); + ChannelFuture write(Object message, ChannelFuture future); +} diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 39c7ebabca..4c6351aed2 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -19,6 +19,7 @@ import io.netty.buffer.ChannelBuffer; import java.io.InputStream; import java.io.OutputStream; +import java.nio.channels.Channels; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -202,7 +203,7 @@ import java.util.NoSuchElementException; * @apiviz.owns io.netty.channel.ChannelHandler * @apiviz.uses io.netty.channel.ChannelSink - - sends events downstream */ -public interface ChannelPipeline extends ChannelHandlerInvoker { +public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker { /** * Inserts a {@link ChannelHandler} at the first position of this pipeline. diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 47ba06f0fa..940a6bb22e 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -521,7 +521,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelBufferHolder nextOut() { + public ChannelBufferHolder out() { return channel().unsafe().out(); } @@ -657,12 +657,68 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public void bind(SocketAddress localAddress, ChannelFuture future) { - bind(firstOutboundContext(), localAddress, future); + public ChannelFuture bind(SocketAddress localAddress) { + ChannelFuture f = channel().newFuture(); + bind(localAddress, f); + return f; + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress) { + ChannelFuture f = channel().newFuture(); + connect(remoteAddress, f); + return f; + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + ChannelFuture f = channel().newFuture(); + connect(remoteAddress, localAddress, f); + return f; + } + + @Override + public ChannelFuture disconnect() { + ChannelFuture f = channel().newFuture(); + disconnect(f); + return f; + } + + @Override + public ChannelFuture close() { + ChannelFuture f = channel().newFuture(); + close(f); + return f; + } + + @Override + public ChannelFuture deregister() { + ChannelFuture f = channel().newFuture(); + deregister(f); + return f; + } + + @Override + public ChannelFuture flush() { + ChannelFuture f = channel().newFuture(); + flush(f); + return f; + } + + @Override + public ChannelFuture write(Object message) { + ChannelFuture f = channel().newFuture(); + write(message, f); + return f; + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) { + return bind(firstOutboundContext(), localAddress, future); } @SuppressWarnings("unchecked") - private void bind(DefaultChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) { + private ChannelFuture bind(DefaultChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) { if (localAddress == null) { throw new NullPointerException("localAddress"); } @@ -675,20 +731,21 @@ public class DefaultChannelPipeline implements ChannelPipeline { } else { channel().unsafe().bind(localAddress, future); } + return future; } @Override - public void connect(SocketAddress remoteAddress, ChannelFuture future) { - connect(remoteAddress, null, future); + public ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future) { + return connect(remoteAddress, null, future); } @Override - public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { - connect(firstOutboundContext(), remoteAddress, localAddress, future); + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { + return connect(firstOutboundContext(), remoteAddress, localAddress, future); } @SuppressWarnings("unchecked") - private void connect(DefaultChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { + private ChannelFuture connect(DefaultChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } @@ -702,15 +759,17 @@ public class DefaultChannelPipeline implements ChannelPipeline { } else { channel().unsafe().connect(remoteAddress, localAddress, future); } + + return future; } @Override - public void disconnect(ChannelFuture future) { - disconnect(firstOutboundContext(), future); + public ChannelFuture disconnect(ChannelFuture future) { + return disconnect(firstOutboundContext(), future); } @SuppressWarnings("unchecked") - private void disconnect(DefaultChannelHandlerContext ctx, ChannelFuture future) { + private ChannelFuture disconnect(DefaultChannelHandlerContext ctx, ChannelFuture future) { if (ctx != null) { try { ((ChannelOutboundHandler) ctx.handler()).disconnect(ctx, future); @@ -720,15 +779,17 @@ public class DefaultChannelPipeline implements ChannelPipeline { } else { channel().unsafe().disconnect(future); } + + return future; } @Override - public void close(ChannelFuture future) { - close(firstOutboundContext(), future); + public ChannelFuture close(ChannelFuture future) { + return close(firstOutboundContext(), future); } @SuppressWarnings("unchecked") - private void close(DefaultChannelHandlerContext ctx, ChannelFuture future) { + private ChannelFuture close(DefaultChannelHandlerContext ctx, ChannelFuture future) { if (ctx != null) { try { ((ChannelOutboundHandler) ctx.handler()).close(ctx, future); @@ -738,15 +799,17 @@ public class DefaultChannelPipeline implements ChannelPipeline { } else { channel().unsafe().close(future); } + + return future; } @Override - public void deregister(final ChannelFuture future) { - deregister(firstOutboundContext(), future); + public ChannelFuture deregister(final ChannelFuture future) { + return deregister(firstOutboundContext(), future); } @SuppressWarnings("unchecked") - private void deregister(DefaultChannelHandlerContext ctx, ChannelFuture future) { + private ChannelFuture deregister(DefaultChannelHandlerContext ctx, ChannelFuture future) { if (ctx != null) { try { ((ChannelOutboundHandler) ctx.handler()).deregister(ctx, future); @@ -756,46 +819,39 @@ public class DefaultChannelPipeline implements ChannelPipeline { } else { channel().unsafe().deregister(future); } + + return future; } @Override - public void flush(ChannelFuture future) { - DefaultChannelHandlerContext ctx = firstOutboundContext(); - if (ctx != null) { - flush(ctx, future); - } else { - channel().unsafe().flush(future); - } + public ChannelFuture flush(ChannelFuture future) { + return flush(firstOutboundContext(), future); } @SuppressWarnings("unchecked") - private void flush(DefaultChannelHandlerContext ctx, ChannelFuture future) { - try { - ((ChannelOutboundHandler) ctx.handler()).flush(ctx, future); - } catch (Throwable t) { - notifyHandlerException(t); + private ChannelFuture flush(DefaultChannelHandlerContext ctx, ChannelFuture future) { + if (ctx != null) { + try { + ((ChannelOutboundHandler) ctx.handler()).flush(ctx, future); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + channel().unsafe().flush(future); } + + return future; } @Override - public void write(Object message, ChannelFuture future) { + public ChannelFuture write(Object message, ChannelFuture future) { if (message instanceof ChannelBuffer) { ChannelBuffer m = (ChannelBuffer) message; - nextOut().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); + out().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); } else { - nextOut().messageBuffer().add(message); + out().messageBuffer().add(message); } - flush(future); - } - - private static void write(DefaultChannelHandlerContext ctx, Object message, ChannelFuture future) { - if (message instanceof ChannelBuffer) { - ChannelBuffer m = (ChannelBuffer) message; - ctx.nextOut().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); - } else { - ctx.nextOut().messageBuffer().add(message); - } - ctx.flush(future); + return flush(future); } private DefaultChannelHandlerContext firstInboundContext() { @@ -922,7 +978,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { private final boolean canHandleInbound; private final boolean canHandleOutbound; private final ChannelBufferHolder in; - private final ChannelBufferHolder out; + private final ChannelBufferHolder prevOut; @SuppressWarnings("unchecked") DefaultChannelHandlerContext( @@ -961,7 +1017,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } if (canHandleOutbound) { try { - out = ((ChannelOutboundHandler) handler).newOutboundBuffer(this); + prevOut = ((ChannelOutboundHandler) handler).newOutboundBuffer(this); } catch (Exception e) { throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e); } finally { @@ -970,7 +1026,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } } else { - out = null; + prevOut = null; } } @@ -1010,8 +1066,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelBufferHolder out() { - return out; + public ChannelBufferHolder prevOut() { + return prevOut; } @Override @@ -1025,10 +1081,10 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelBufferHolder nextOut() { + public ChannelBufferHolder out() { DefaultChannelHandlerContext next = nextOutboundContext(prev); if (next != null) { - return next.out(); + return next.prevOut(); } else { return channel().unsafe().out(); } @@ -1091,43 +1147,105 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public void bind(SocketAddress localAddress, ChannelFuture future) { - DefaultChannelPipeline.this.bind(nextOutboundContext(prev), localAddress, future); + public ChannelFuture bind(SocketAddress localAddress) { + ChannelFuture f = newFuture(); + bind(localAddress, f); + return f; } @Override - public void connect(SocketAddress remoteAddress, ChannelFuture future) { - connect(remoteAddress, null, future); + public ChannelFuture connect(SocketAddress remoteAddress) { + ChannelFuture f = newFuture(); + connect(remoteAddress, f); + return f; } @Override - public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { - DefaultChannelPipeline.this.connect(nextOutboundContext(prev), remoteAddress, localAddress, future); + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + ChannelFuture f = newFuture(); + connect(remoteAddress, localAddress, f); + return f; } @Override - public void disconnect(ChannelFuture future) { - DefaultChannelPipeline.this.disconnect(nextOutboundContext(prev), future); + public ChannelFuture disconnect() { + ChannelFuture f = newFuture(); + disconnect(f); + return f; } @Override - public void close(ChannelFuture future) { - DefaultChannelPipeline.this.close(nextOutboundContext(prev), future); + public ChannelFuture close() { + ChannelFuture f = newFuture(); + close(f); + return f; } @Override - public void deregister(ChannelFuture future) { - DefaultChannelPipeline.this.deregister(nextOutboundContext(prev), future); + public ChannelFuture deregister() { + ChannelFuture f = newFuture(); + deregister(f); + return f; } @Override - public void flush(ChannelFuture future) { - DefaultChannelPipeline.this.flush(nextOutboundContext(prev), future); + public ChannelFuture flush() { + ChannelFuture f = newFuture(); + flush(f); + return f; } @Override - public void write(Object message, ChannelFuture future) { - DefaultChannelPipeline.write(nextOutboundContext(prev), message, future); + public ChannelFuture write(Object message) { + ChannelFuture f = newFuture(); + write(message, f); + return f; + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) { + return DefaultChannelPipeline.this.bind(nextOutboundContext(prev), localAddress, future); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future) { + return connect(remoteAddress, null, future); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { + return DefaultChannelPipeline.this.connect(nextOutboundContext(prev), remoteAddress, localAddress, future); + } + + @Override + public ChannelFuture disconnect(ChannelFuture future) { + return DefaultChannelPipeline.this.disconnect(nextOutboundContext(prev), future); + } + + @Override + public ChannelFuture close(ChannelFuture future) { + return DefaultChannelPipeline.this.close(nextOutboundContext(prev), future); + } + + @Override + public ChannelFuture deregister(ChannelFuture future) { + return DefaultChannelPipeline.this.deregister(nextOutboundContext(prev), future); + } + + @Override + public ChannelFuture flush(ChannelFuture future) { + return DefaultChannelPipeline.this.flush(nextOutboundContext(prev), future); + } + + @Override + public ChannelFuture write(Object message, ChannelFuture future) { + if (message instanceof ChannelBuffer) { + ChannelBuffer m = (ChannelBuffer) message; + out().byteBuffer().writeBytes(m, m.readerIndex(), m.readableBytes()); + } else { + out().messageBuffer().add(message); + } + return flush(future); } @Override @@ -1144,5 +1262,10 @@ public class DefaultChannelPipeline implements ChannelPipeline { public ChannelFuture newFailedFuture(Throwable cause) { return channel().newFailedFuture(cause); } + + @Override + public ChannelFuture newVoidFuture() { + return channel().newVoidFuture(); + } } } diff --git a/transport/src/main/java/io/netty/channel/EventLoop.java b/transport/src/main/java/io/netty/channel/EventLoop.java index 0e2f2f0113..e9bf14f77a 100644 --- a/transport/src/main/java/io/netty/channel/EventLoop.java +++ b/transport/src/main/java/io/netty/channel/EventLoop.java @@ -4,6 +4,6 @@ import java.util.concurrent.ExecutorService; public interface EventLoop extends ExecutorService { ChannelFuture register(Channel channel); - EventLoop register(Channel channel, ChannelFuture future); + ChannelFuture register(Channel channel, ChannelFuture future); boolean inEventLoop(); } diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java index d92b17a660..60b4d8a648 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java @@ -17,6 +17,9 @@ public class MultithreadEventLoop implements EventLoop { private final EventLoop[] children; private final AtomicInteger childIndex = new AtomicInteger(); + public MultithreadEventLoop(Class loopType) { + this(loopType, Runtime.getRuntime().availableProcessors() * 2); + } public MultithreadEventLoop(Class loopType, int nThreads) { this(loopType, nThreads, Executors.defaultThreadFactory()); @@ -35,13 +38,17 @@ public class MultithreadEventLoop implements EventLoop { children = new EventLoop[nThreads]; for (int i = 0; i < nThreads; i ++) { + boolean success = false; try { children[i] = loopType.getConstructor(ThreadFactory.class).newInstance(threadFactory); + success = true; } catch (Exception e) { throw new EventLoopException("failed to create a child event loop: " + loopType.getName(), e); } finally { - for (int j = 0; j < i; j ++) { - children[j].shutdown(); + if (!success) { + for (int j = 0; j < i; j ++) { + children[j].shutdown(); + } } } } @@ -152,7 +159,7 @@ public class MultithreadEventLoop implements EventLoop { } @Override - public EventLoop register(Channel channel, ChannelFuture future) { + public ChannelFuture register(Channel channel, ChannelFuture future) { return nextEventLoop().register(channel, future); } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 2de1538ed1..4c3ea3a4ae 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -51,13 +51,13 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl @Override public ChannelFuture register(Channel channel) { - ChannelFuture future = new DefaultChannelFuture(channel, false); + ChannelFuture future = channel.newFuture(); register(channel, future); return future; } @Override - public EventLoop register(final Channel channel, final ChannelFuture future) { + public ChannelFuture register(final Channel channel, final ChannelFuture future) { if (inEventLoop()) { channel.unsafe().register(this, future); } else { @@ -68,7 +68,7 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl } }); } - return this; + return future; } protected void interruptThread() { diff --git a/transport/src/main/java/io/netty/channel/VoidChannelFuture.java b/transport/src/main/java/io/netty/channel/VoidChannelFuture.java new file mode 100644 index 0000000000..226b255358 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/VoidChannelFuture.java @@ -0,0 +1,129 @@ +package io.netty.channel; + +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class VoidChannelFuture implements ChannelFuture { + + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(VoidChannelFuture.class); + + private final Channel channel; + + /** + * Creates a new instance. + * + * @param channel the {@link Channel} associated with this future + */ + public VoidChannelFuture(Channel channel) { + if (channel == null) { + throw new NullPointerException("channel"); + } + this.channel = channel; + } + + @Override + public void addListener(final ChannelFutureListener listener) { + fail(); + } + + @Override + public void removeListener(ChannelFutureListener listener) { + // NOOP + } + + @Override + public ChannelFuture await() throws InterruptedException { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + return this; + } + + @Override + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + fail(); + return false; + } + + @Override + public boolean await(long timeoutMillis) throws InterruptedException { + fail(); + return false; + } + + @Override + public ChannelFuture awaitUninterruptibly() { + fail(); + return this; + } + + @Override + public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { + fail(); + return false; + } + + @Override + public boolean awaitUninterruptibly(long timeoutMillis) { + fail(); + return false; + } + + @Override + public Channel channel() { + return channel; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isSuccess() { + return false; + } + + @Override + public Throwable cause() { + return null; + } + + @Override + public ChannelFuture rethrowIfFailed() throws Exception { + fail(); + return this; + } + + @Override + public boolean setProgress(long amount, long current, long total) { + return false; + } + + @Override + public boolean setFailure(Throwable cause) { + return false; + } + + @Override + public boolean setSuccess() { + return false; + } + + @Override + public boolean cancel() { + return false; + } + + private static void fail() { + throw new IllegalStateException("void future"); + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java index dc465dcddd..88dd680db7 100644 --- a/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java @@ -45,6 +45,8 @@ public interface DatagramChannel extends Channel { */ void joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface, ChannelFuture future); + void joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source, ChannelFuture future); + /** * Leaves a multicast group. */ diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 7b6f469359..2c11e1c6e5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -18,7 +18,6 @@ package io.netty.channel.socket.nio; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; import java.net.InetSocketAddress; import java.nio.channels.SelectableChannel; @@ -33,13 +32,8 @@ public abstract class AbstractNioChannel extends AbstractChannel { private volatile SelectionKey selectionKey; - protected AbstractNioChannel(Integer id, Channel parent, SelectableChannel ch) { - super(id, parent); - this.ch = ch; - } - - protected AbstractNioChannel(Channel parent, SelectableChannel ch) { - super(parent); + protected AbstractNioChannel(Channel parent, Integer id, SelectableChannel ch) { + super(parent, id); this.ch = ch; } @@ -87,16 +81,12 @@ public abstract class AbstractNioChannel extends AbstractChannel { public abstract NioChannelConfig config(); @Override - protected void doRegister(ChannelFuture future) { + protected void doRegister() throws Exception { if (!(eventLoop() instanceof SelectorEventLoop)) { throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName()); } SelectorEventLoop loop = (SelectorEventLoop) eventLoop(); - try { - selectionKey = javaChannel().register(loop.selector, javaChannel().validOps() & ~SelectionKey.OP_WRITE, this); - } catch (Exception e) { - throw new ChannelException("failed to register a channel", e); - } + selectionKey = javaChannel().register(loop.selector, javaChannel().validOps() & ~SelectionKey.OP_WRITE, this); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index b2d89e5f10..89851b819e 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -17,7 +17,6 @@ package io.netty.channel.socket.nio; import io.netty.channel.AbstractServerChannel; import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig; import io.netty.logging.InternalLogger; @@ -29,7 +28,7 @@ import java.net.SocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; -final class NioServerSocketChannel extends AbstractServerChannel +public class NioServerSocketChannel extends AbstractServerChannel implements io.netty.channel.socket.ServerSocketChannel { private static final InternalLogger logger = @@ -41,6 +40,8 @@ final class NioServerSocketChannel extends AbstractServerChannel private volatile SelectionKey selectionKey; public NioServerSocketChannel() { + super(null); + try { socket = ServerSocketChannel.open(); } catch (IOException e) { @@ -109,73 +110,37 @@ final class NioServerSocketChannel extends AbstractServerChannel } @Override - protected void doRegister(ChannelFuture future) { + protected void doRegister() throws Exception { if (!(eventLoop() instanceof SelectorEventLoop)) { throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName()); } SelectorEventLoop loop = (SelectorEventLoop) eventLoop(); - try { - selectionKey = javaChannel().register(loop.selector, javaChannel().validOps(), this); - } catch (Exception e) { - throw new ChannelException("failed to register a channel", e); - } + selectionKey = javaChannel().register(loop.selector, javaChannel().validOps(), this); } @Override - protected void doBind(SocketAddress localAddress, ChannelFuture future) { - try { - javaChannel().socket().bind(localAddress); - future.setSuccess(); - pipeline().fireChannelActive(); - } catch (Exception e) { - future.setFailure(e); - } + protected void doBind(SocketAddress localAddress) throws Exception { + javaChannel().socket().bind(localAddress); } @Override - protected void doClose(ChannelFuture future) { - try { - javaChannel().close(); - } catch (Exception e) { - logger.warn("Failed to close a channel.", e); - } - - future.setSuccess(); - pipeline().fireChannelInactive(); - - if (isRegistered()) { - deregister(null); - } + protected void doClose() throws Exception { + javaChannel().close(); } @Override - protected void doDeregister(ChannelFuture future) { - try { - selectionKey.cancel(); - future.setSuccess(); - pipeline().fireChannelUnregistered(); - } catch (Exception e) { - future.setFailure(e); - } + protected void doDeregister() throws Exception { + selectionKey.cancel(); } @Override - protected int doRead() { - int acceptedConns = 0; - for (;;) { - try { - java.nio.channels.SocketChannel ch = javaChannel().accept(); - if (ch == null) { - break; - } - pipeline().nextIn().messageBuffer().add(new NioSocketChannel(this, ch)); - } catch (ChannelException e) { - pipeline().fireExceptionCaught(e); - } catch (Exception e) { - pipeline().fireExceptionCaught(new ChannelException("failed to accept a connection", e)); - } + protected int doRead() throws Exception { + java.nio.channels.SocketChannel ch = javaChannel().accept(); + if (ch == null) { + return 0; } - return acceptedConns; + pipeline().nextIn().messageBuffer().add(new NioSocketChannel(this, null, ch)); + return 1; } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index d08d5596b7..55c7073b52 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -21,14 +21,12 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; @@ -40,43 +38,35 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha private final ChannelBufferHolder out = ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer()); private static SocketChannel newSocket() { - SocketChannel socket; try { - socket = SocketChannel.open(); + return SocketChannel.open(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } + } - boolean success = false; + public NioSocketChannel() { + this(null, null, newSocket()); + } + + public NioSocketChannel(Channel parent, Integer id, SocketChannel socket) { + super(parent, id, socket); try { socket.configureBlocking(false); - success = true; } catch (IOException e) { - throw new ChannelException("Failed to enter non-blocking mode.", e); - } finally { - if (!success) { - try { - socket.close(); - } catch (IOException e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a partially initialized socket.", - e); - } - + try { + socket.close(); + } catch (IOException e2) { + if (logger.isWarnEnabled()) { + logger.warn( + "Failed to close a partially initialized socket.", e2); } + } + + throw new ChannelException("Failed to enter non-blocking mode.", e); } - return socket; - } - - public NioSocketChannel(Channel parent) { - this(parent, newSocket()); - } - - public NioSocketChannel(Channel parent, SocketChannel socket) { - super(parent, socket); config = new DefaultNioSocketChannelConfig(socket.socket()); } @@ -113,120 +103,58 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha } @Override - protected void doBind(SocketAddress localAddress, ChannelFuture future) { - try { - javaChannel().socket().bind(localAddress); - future.setSuccess(); - } catch (Exception e) { - future.setFailure(e); - } + protected void doBind(SocketAddress localAddress) throws Exception { + javaChannel().socket().bind(localAddress); } @Override - protected void doConnect(SocketAddress remoteAddress, - SocketAddress localAddress, ChannelFuture future) { + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { - try { - javaChannel().socket().bind(localAddress); - } catch (Exception e) { - future.setFailure(e); + javaChannel().socket().bind(localAddress); + } + + boolean success = false; + try { + boolean connected = javaChannel().connect(remoteAddress); + success = true; + return connected; + } finally { + if (!success) { + doClose(); } } + } - try { - if (javaChannel().connect(remoteAddress)) { - future.setSuccess(); - pipeline().fireChannelActive(); - } - } catch (Exception e) { - future.setFailure(e); - close(null); + @Override + protected void doFinishConnect() throws Exception { + if (!javaChannel().finishConnect()) { + throw new Error(); } } @Override - protected void doFinishConnect(ChannelFuture future) { - try { - if (javaChannel().finishConnect()) { - future.setSuccess(); - pipeline().fireChannelActive(); - } - } catch (Exception e) { - future.setFailure(e); - close(null); - } + protected void doDisconnect() throws Exception { + doClose(); } @Override - protected void doDisconnect(ChannelFuture future) { - doClose(future); + protected void doClose() throws Exception { + javaChannel().close(); } @Override - protected void doClose(ChannelFuture future) { - try { - javaChannel().close(); - } catch (Exception e) { - logger.warn("Failed to close a channel.", e); - } - - future.setSuccess(); - pipeline().fireChannelInactive(); - - if (isRegistered()) { - deregister(null); - } + protected void doDeregister() throws Exception { + selectionKey().cancel(); } @Override - protected void doDeregister(ChannelFuture future) { - try { - selectionKey().cancel(); - future.setSuccess(); - pipeline().fireChannelUnregistered(); - } catch (Exception e) { - future.setFailure(e); - } - } - - @Override - protected int doRead() { - final SocketChannel ch = javaChannel(); - - int ret = 0; - int readBytes = 0; - boolean failure = true; - + protected int doRead() throws Exception { ChannelBuffer buf = pipeline().nextIn().byteBuffer(); - try { - while ((ret = buf.writeBytes(ch, buf.writableBytes())) > 0) { - readBytes += ret; - if (!buf.writable()) { - break; - } - } - failure = false; - } catch (ClosedChannelException e) { - // Can happen, and does not need a user attention. - } catch (Throwable t) { - pipeline().fireExceptionCaught(t); - } - - if (readBytes > 0) { - pipeline().fireInboundBufferUpdated(); - } - - if (ret < 0 || failure) { - selectionKey().cancel(); // Some JDK implementations run into an infinite loop without this. - close(null); - return -1; - } - - return readBytes; + return buf.writeBytes(javaChannel(), buf.writableBytes()); } @Override - protected int doFlush(ChannelFuture future) { + protected int doFlush() throws Exception { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; @@ -236,26 +164,22 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha final ChannelBuffer buf = unsafe().out().byteBuffer(); int bytesLeft = buf.readableBytes(); if (bytesLeft == 0) { - future.setSuccess(); return 0; } - int readerIndex = buf.readerIndex(); int localWrittenBytes = 0; int writtenBytes = 0; try { for (int i = writeSpinCount; i > 0; i --) { - localWrittenBytes = buf.getBytes(readerIndex, ch, bytesLeft); + localWrittenBytes = buf.readBytes(ch, bytesLeft); if (localWrittenBytes > 0) { bytesLeft -= localWrittenBytes; if (bytesLeft <= 0) { removeOpWrite = true; - future.setSuccess(); break; } - readerIndex += localWrittenBytes; writtenBytes += localWrittenBytes; } else { addOpWrite = true; @@ -265,11 +189,10 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha } catch (AsynchronousCloseException e) { // Doesn't need a user attention - ignore. } catch (Throwable t) { - future.setFailure(t); - pipeline().fireExceptionCaught(t); if (t instanceof IOException) { open = false; - close(null); + selectionKey().cancel(); + ch.close(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java index 61683ceedb..c0a8d5b6e5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java @@ -33,7 +33,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; -abstract class SelectorEventLoop extends SingleThreadEventLoop { +public class SelectorEventLoop extends SingleThreadEventLoop { /** * Internal Netty logger. */ @@ -57,19 +57,19 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation - protected SelectorEventLoop() { + public SelectorEventLoop() { this(Executors.defaultThreadFactory()); } - protected SelectorEventLoop(ThreadFactory threadFactory) { + public SelectorEventLoop(ThreadFactory threadFactory) { this(threadFactory, SelectorProvider.provider()); } - protected SelectorEventLoop(SelectorProvider selectorProvider) { + public SelectorEventLoop(SelectorProvider selectorProvider) { this(Executors.defaultThreadFactory(), selectorProvider); } - protected SelectorEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) { + public SelectorEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(threadFactory); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); @@ -190,7 +190,8 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { - if (ch.unsafe().read() < 0) { + ch.unsafe().read(); + if (!ch.isOpen()) { // Connection already closed - no need to handle write. continue; } @@ -265,4 +266,11 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop { } return false; } + + @Override + protected void wakeup(boolean inEventLoop) { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } }