From c6f3b5762e0dc40ab75c27a8f10c5d1412ed3d39 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 24 May 2012 08:57:10 -0700 Subject: [PATCH] Implement NIO datagram transport with the new API - AbstractChannel now handles flushing a message buffer - Cleaned up DatagramChannel interface - Removed ProtocolFamily because a user can create an NIO DatagramChannel and specify it as a constructor parameter - UniqueName and UniqueKey constructors became public so that I don't need to create a subclass every time. --- .../src/main/java/io/netty/util/Signal.java | 10 +- .../main/java/io/netty/util/UniqueKey.java | 2 +- .../main/java/io/netty/util/UniqueName.java | 2 +- .../io/netty/channel/AbstractChannel.java | 56 +- .../netty/channel/socket/DatagramChannel.java | 51 +- .../netty/channel/socket/DatagramPacket.java | 36 ++ .../socket/nio/AbstractNioChannel.java | 11 + .../socket/nio/NioDatagramChannel.java | 484 +++++++++++------- .../socket/nio/NioDatagramPipelineSink.java | 169 ------ .../channel/socket/nio/NioDatagramWorker.java | 303 ----------- 10 files changed, 450 insertions(+), 674 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/socket/DatagramPacket.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java diff --git a/common/src/main/java/io/netty/util/Signal.java b/common/src/main/java/io/netty/util/Signal.java index 8b3f481bb8..430850008c 100644 --- a/common/src/main/java/io/netty/util/Signal.java +++ b/common/src/main/java/io/netty/util/Signal.java @@ -11,11 +11,11 @@ public final class Signal extends Error { private static final ConcurrentMap map = new ConcurrentHashMap(); - private final SignalName uname; + private final UniqueName uname; public Signal(String name) { super(name); - uname = new SignalName(name); + uname = new UniqueName(map, name); } public void expect(Signal signal) { @@ -38,10 +38,4 @@ public final class Signal extends Error { public String toString() { return uname.name(); } - - private static class SignalName extends UniqueName { - protected SignalName(String name) { - super(map, name); - } - } } diff --git a/common/src/main/java/io/netty/util/UniqueKey.java b/common/src/main/java/io/netty/util/UniqueKey.java index 609f806066..a878d3c6db 100644 --- a/common/src/main/java/io/netty/util/UniqueKey.java +++ b/common/src/main/java/io/netty/util/UniqueKey.java @@ -7,7 +7,7 @@ public class UniqueKey extends UniqueName { private final Class valueType; private final String strVal; - protected UniqueKey(ConcurrentMap map, String name, Class valueType) { + public UniqueKey(ConcurrentMap map, String name, Class valueType) { super(map, name, valueType); this.valueType = valueType; strVal = name + '[' + valueType.getSimpleName() + ']'; diff --git a/common/src/main/java/io/netty/util/UniqueName.java b/common/src/main/java/io/netty/util/UniqueName.java index a2d88cd714..f4adb62f6b 100644 --- a/common/src/main/java/io/netty/util/UniqueName.java +++ b/common/src/main/java/io/netty/util/UniqueName.java @@ -10,7 +10,7 @@ public class UniqueName implements Comparable { private final int id; private final String name; - protected UniqueName(ConcurrentMap map, String name, Object... args) { + public UniqueName(ConcurrentMap map, String name, Object... args) { if (map == null) { throw new NullPointerException("map"); } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 76bb29a257..7af5c1fae0 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.ConnectException; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledFuture; @@ -715,20 +716,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private void flush0() { // Perform outbound I/O. try { - for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { - int localFlushedAmount = doFlush(i == 0); - if (localFlushedAmount > 0) { - flushedAmount += localFlushedAmount; - notifyFlushFutures(); - break; - } - if (out().isEmpty()) { - // Reset reader/writerIndex to 0 if the buffer is empty. - if (out().hasByteBuffer()) { - out().byteBuffer().clear(); - } - break; - } + ChannelBufferHolder out = out(); + if (out.hasByteBuffer()) { + flushByteBuf(out.byteBuffer()); + } else { + flushMessageBuf(out.messageBuffer()); } } catch (Throwable t) { notifyFlushFutures(t); @@ -741,6 +733,42 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } + private void flushByteBuf(ChannelBuffer buf) throws Exception { + for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { + int localFlushedAmount = doFlush(i == 0); + if (localFlushedAmount > 0) { + flushedAmount += localFlushedAmount; + notifyFlushFutures(); + break; + } + if (!buf.readable()) { + // Reset reader/writerIndex to 0 if the buffer is empty. + buf.clear(); + break; + } + } + } + + private void flushMessageBuf(Queue buf) throws Exception { + final int writeSpinCount = config().getWriteSpinCount() - 1; + while (!buf.isEmpty()) { + boolean wrote = false; + for (int i = writeSpinCount; i >= 0; i --) { + int localFlushedAmount = doFlush(i == 0); + if (localFlushedAmount > 0) { + flushedAmount += localFlushedAmount; + wrote = true; + notifyFlushFutures(); + break; + } + } + + if (!wrote) { + break; + } + } + } + private void notifyFlushFutures() { FlushFutureEntry e = flushFuture; if (e == null) { diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java index 88dd680db7..fe08ae8850 100644 --- a/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/DatagramChannel.java @@ -38,22 +38,63 @@ public interface DatagramChannel extends Channel { /** * Joins a multicast group. */ - void joinGroup(InetAddress multicastAddress, ChannelFuture future); + ChannelFuture joinGroup(InetAddress multicastAddress); + ChannelFuture joinGroup(InetAddress multicastAddress, ChannelFuture future); /** * Joins the specified multicast group at the specified interface. */ - void joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface, ChannelFuture future); + ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface); + ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface, ChannelFuture future); - void joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source, ChannelFuture future); + ChannelFuture joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source); + ChannelFuture joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source, ChannelFuture future); /** * Leaves a multicast group. */ - void leaveGroup(InetAddress multicastAddress, ChannelFuture future); + ChannelFuture leaveGroup(InetAddress multicastAddress); + ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelFuture future); /** * Leaves a multicast group on a specified local interface. */ - void leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface, ChannelFuture future); + ChannelFuture leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface); + ChannelFuture leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface, ChannelFuture future); + + /** + * Leave the specified multicast group at the specified interface using the specified source. + */ + ChannelFuture leaveGroup( + InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source); + ChannelFuture leaveGroup( + InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source, + ChannelFuture future); + + /** + * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface + */ + ChannelFuture block( + InetAddress multicastAddress, NetworkInterface networkInterface, + InetAddress sourceToBlock); + + /** + * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface + */ + ChannelFuture block( + InetAddress multicastAddress, NetworkInterface networkInterface, + InetAddress sourceToBlock, ChannelFuture future); + + /** + * Block the given sourceToBlock address for the given multicastAddress + * + */ + ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock); + + /** + * Block the given sourceToBlock address for the given multicastAddress + * + */ + ChannelFuture block( + InetAddress multicastAddress, InetAddress sourceToBlock, ChannelFuture future); } diff --git a/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java b/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java new file mode 100644 index 0000000000..a7a79b6d79 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/DatagramPacket.java @@ -0,0 +1,36 @@ +package io.netty.channel.socket; + +import io.netty.buffer.ChannelBuffer; + +import java.net.InetSocketAddress; + +public class DatagramPacket { + + private final ChannelBuffer data; + private final InetSocketAddress remoteAddress; + + public DatagramPacket(ChannelBuffer data, InetSocketAddress remoteAddress) { + if (data == null) { + throw new NullPointerException("data"); + } + if (remoteAddress == null) { + throw new NullPointerException("remoteAddress"); + } + + this.data = data; + this.remoteAddress = remoteAddress; + } + + public ChannelBuffer data() { + return data; + } + + public InetSocketAddress remoteAddress() { + return remoteAddress; + } + + @Override + public String toString() { + return "datagram(" + data.readableBytes() + "B, " + remoteAddress + ')'; + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 5163bdf5bc..8b41af86b2 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -19,6 +19,7 @@ import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.EventLoop; +import java.net.InetSocketAddress; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; @@ -32,6 +33,16 @@ public abstract class AbstractNioChannel extends AbstractChannel { this.ch = ch; } + @Override + public InetSocketAddress localAddress() { + return (InetSocketAddress) super.localAddress(); + } + + @Override + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) super.remoteAddress(); + } + @Override protected SelectableChannel javaChannel() { return ch; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 5e20027f10..7f71645e6d 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -15,14 +15,15 @@ */ package io.netty.channel.socket.nio; -import static io.netty.channel.Channels.fireChannelOpen; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; -import io.netty.channel.Channels; import io.netty.channel.socket.DatagramChannelConfig; +import io.netty.channel.socket.DatagramPacket; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DetectionUtil; import java.io.IOException; @@ -31,151 +32,257 @@ import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.SocketAddress; import java.net.SocketException; +import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.MembershipKey; +import java.nio.channels.SelectionKey; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; /** * Provides an NIO based {@link io.netty.channel.socket.DatagramChannel}. */ public final class NioDatagramChannel extends AbstractNioChannel implements io.netty.channel.socket.DatagramChannel { - /** - * The supported ProtocolFamily by UDP - * - */ - public enum ProtocolFamily { - INET, - INET6 - } - /** - * The {@link DatagramChannelConfig}. - */ - private final NioDatagramChannelConfig config; - private Map> memberships; - - static NioDatagramChannel create(ChannelFactory factory, - ChannelPipeline pipeline, ChannelSink sink, NioDatagramWorker worker, ProtocolFamily family) { - NioDatagramChannel instance = - new NioDatagramChannel(factory, pipeline, sink, worker, family); - fireChannelOpen(instance); - return instance; - } + private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioDatagramChannel.class); - private NioDatagramChannel(final ChannelFactory factory, - final ChannelPipeline pipeline, final ChannelSink sink, - final NioDatagramWorker worker, ProtocolFamily family) { - super(null, factory, pipeline, sink, worker, new NioDatagramJdkChannel(openNonBlockingChannel(family))); - config = new DefaultNioDatagramChannelConfig(getJdkChannel().getChannel()); - } + private final DatagramChannelConfig config; + private final Map> memberships = + new HashMap>(); + private final ChannelBufferHolder out = ChannelBufferHolders.messageBuffer(); - private static DatagramChannel openNonBlockingChannel(ProtocolFamily family) { + private static DatagramChannel newSocket() { try { - final DatagramChannel channel; - - // check if we are on java 7 or if the family was not specified - if (DetectionUtil.javaVersion() < 7 || family == null) { - channel = DatagramChannel.open(); - } else { - // This block only works on java7++, but we checked before if we have it - switch (family) { - case INET: - channel = DatagramChannel.open(java.net.StandardProtocolFamily.INET); - break; - - case INET6: - channel = DatagramChannel.open(java.net.StandardProtocolFamily.INET6); - break; - - default: - throw new IllegalArgumentException(); - } - } - - channel.configureBlocking(false); - return channel; - } catch (final IOException e) { - throw new ChannelException("Failed to open a DatagramChannel.", e); + return DatagramChannel.open(); + } catch (IOException e) { + throw new ChannelException("Failed to open a socket.", e); } } + public NioDatagramChannel() { + this(newSocket()); + } + + public NioDatagramChannel(DatagramChannel socket) { + this(null, socket); + } + + public NioDatagramChannel(Integer id, DatagramChannel socket) { + super(null, id, socket); + try { + socket.configureBlocking(false); + } catch (IOException e) { + try { + socket.close(); + } catch (IOException e2) { + if (logger.isWarnEnabled()) { + logger.warn( + "Failed to close a partially initialized socket.", e2); + } + + } + + throw new ChannelException("Failed to enter non-blocking mode.", e); + } + + config = new DefaultNioDatagramChannelConfig(socket); - @Override - protected NioDatagramJdkChannel getJdkChannel() { - return (NioDatagramJdkChannel) super.getJdkChannel(); } @Override - public NioDatagramWorker getWorker() { - return (NioDatagramWorker) super.getWorker(); - } - - @Override - public boolean isBound() { - return isOpen() && getJdkChannel().isSocketBound(); - } - - @Override - public boolean isConnected() { - return getJdkChannel().isConnected(); - } - - @Override - protected boolean setClosed() { - return super.setClosed(); - } - - @Override - public NioDatagramChannelConfig getConfig() { + public DatagramChannelConfig config() { return config; } @Override - public ChannelFuture joinGroup(InetAddress multicastAddress) { - try { - return joinGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null); - } catch (SocketException e) { - return Channels.failedFuture(this, e); + public boolean isActive() { + DatagramChannel ch = javaChannel(); + return ch.isOpen() && ch.socket().isBound(); + } + + @Override + protected DatagramChannel javaChannel() { + return (DatagramChannel) super.javaChannel(); + } + + @Override + protected ChannelBufferHolder firstOut() { + return out; + } + + @Override + protected SocketAddress localAddress0() { + return javaChannel().socket().getLocalSocketAddress(); + } + + @Override + protected SocketAddress remoteAddress0() { + return javaChannel().socket().getRemoteSocketAddress(); + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + javaChannel().bind(localAddress); + selectionKey().interestOps(SelectionKey.OP_READ); + } + + @Override + protected boolean doConnect(SocketAddress remoteAddress, + SocketAddress localAddress) throws Exception { + if (localAddress != null) { + javaChannel().socket().bind(localAddress); + } + + boolean success = false; + try { + javaChannel().connect(remoteAddress); + selectionKey().interestOps(selectionKey().interestOps() | SelectionKey.OP_READ); + success = true; + return true; + } finally { + if (!success) { + doClose(); + } } } @Override - public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) { - return joinGroup(multicastAddress.getAddress(), networkInterface, null); + protected void doFinishConnect() throws Exception { + throw new Error(); } - /** - * Joins the specified multicast group at the specified interface using the specified source. - */ - public ChannelFuture joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) { + @Override + protected void doDisconnect() throws Exception { + javaChannel().disconnect(); + } + + @Override + protected void doClose() throws Exception { + javaChannel().close(); + } + + @Override + protected void doDeregister() throws Exception { + selectionKey().cancel(); + ((SingleThreadSelectorEventLoop) eventLoop()).cancelledKeys ++; + } + + @Override + protected int doRead(ChannelBufferHolder buf) throws Exception { + DatagramChannel ch = javaChannel(); + ByteBuffer data = ByteBuffer.allocate(1024); + InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(data); + if (remoteAddress == null) { + return 0; + } + + data.flip(); + buf.messageBuffer().add(new DatagramPacket(ChannelBuffers.wrappedBuffer(data), remoteAddress)); + return 1; + } + + @Override + protected int doFlush(boolean lastSpin) throws Exception { + final Queue buf = unsafe().out().messageBuffer(); + if (buf.isEmpty()) { + return 0; + } + + DatagramPacket packet = (DatagramPacket) buf.peek(); + final int writtenBytes = javaChannel().send(packet.data().toByteBuffer(), packet.remoteAddress()); + + final SelectionKey key = selectionKey(); + final int interestOps = key.interestOps(); + if (writtenBytes <= 0) { + // Did not write a packet. + // 1) If 'lastSpin' is false, the caller will call this method again real soon. + // - Do not update OP_WRITE. + // 2) If 'lastSpin' is true, the caller will not retry. + // - Set OP_WRITE so that the event loop calls flushForcibly() later. + if (lastSpin) { + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + key.interestOps(interestOps | SelectionKey.OP_WRITE); + } + } + return 0; + } + + // Wrote a packet. + buf.remove(); + if (buf.isEmpty()) { + // Wrote the outbound buffer completely - clear OP_WRITE. + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + key.interestOps(interestOps & ~SelectionKey.OP_WRITE); + } + } + return 1; + } + + @Override + public ChannelFuture joinGroup(InetAddress multicastAddress) { + return joinGroup(multicastAddress, newFuture()); + } + + @Override + public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelFuture future) { + try { + return joinGroup( + multicastAddress, + NetworkInterface.getByInetAddress(localAddress().getAddress()), + null, future); + } catch (SocketException e) { + future.setFailure(e); + } + return future; + } + + @Override + public ChannelFuture joinGroup( + InetSocketAddress multicastAddress, NetworkInterface networkInterface) { + return joinGroup(multicastAddress, networkInterface, newFuture()); + } + + @Override + public ChannelFuture joinGroup( + InetSocketAddress multicastAddress, NetworkInterface networkInterface, + ChannelFuture future) { + return joinGroup(multicastAddress.getAddress(), networkInterface, null, future); + } + + @Override + public ChannelFuture joinGroup( + InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) { + return joinGroup(multicastAddress, networkInterface, source, newFuture()); + } + + @Override + public ChannelFuture joinGroup( + InetAddress multicastAddress, NetworkInterface networkInterface, + InetAddress source, ChannelFuture future) { if (DetectionUtil.javaVersion() < 7) { throw new UnsupportedOperationException(); } else { if (multicastAddress == null) { throw new NullPointerException("multicastAddress"); } - + if (networkInterface == null) { throw new NullPointerException("networkInterface"); } - + try { MembershipKey key; if (source == null) { - key = getJdkChannel().getChannel().join(multicastAddress, networkInterface); + key = javaChannel().join(multicastAddress, networkInterface); } else { - key = getJdkChannel().getChannel().join(multicastAddress, networkInterface, source); + key = javaChannel().join(multicastAddress, networkInterface, source); } synchronized (this) { - if (memberships == null) { - memberships = new HashMap>(); - - } List keys = memberships.get(multicastAddress); if (keys == null) { keys = new ArrayList(); @@ -183,77 +290,106 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n } keys.add(key); } + + future.setSuccess(); } catch (Throwable e) { - return Channels.failedFuture(this, e); + future.setFailure(e); } } - return Channels.succeededFuture(this); + return future; } - + @Override public ChannelFuture leaveGroup(InetAddress multicastAddress) { - try { - return leaveGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null); - } catch (SocketException e) { - return Channels.failedFuture(this, e); - } - + return leaveGroup(multicastAddress, newFuture()); } @Override - public ChannelFuture leaveGroup(InetSocketAddress multicastAddress, - NetworkInterface networkInterface) { - return leaveGroup(multicastAddress.getAddress(), networkInterface, null); + public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelFuture future) { + try { + return leaveGroup(multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, future); + } catch (SocketException e) { + future.setFailure(e); + } + return future; } - /** - * Leave the specified multicast group at the specified interface using the specified source. - */ - public ChannelFuture leaveGroup(InetAddress multicastAddress, - NetworkInterface networkInterface, InetAddress source) { + @Override + public ChannelFuture leaveGroup( + InetSocketAddress multicastAddress, NetworkInterface networkInterface) { + return leaveGroup(multicastAddress, networkInterface, newFuture()); + } + + @Override + public ChannelFuture leaveGroup( + InetSocketAddress multicastAddress, + NetworkInterface networkInterface, ChannelFuture future) { + return leaveGroup(multicastAddress.getAddress(), networkInterface, null, future); + } + + @Override + public ChannelFuture leaveGroup( + InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) { + return leaveGroup(multicastAddress, networkInterface, source, newFuture()); + } + + @Override + public ChannelFuture leaveGroup( + InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source, + ChannelFuture future) { if (DetectionUtil.javaVersion() < 7) { throw new UnsupportedOperationException(); - } else { - if (multicastAddress == null) { - throw new NullPointerException("multicastAddress"); - } - - if (networkInterface == null) { - throw new NullPointerException("networkInterface"); - } - - synchronized (this) { - if (memberships != null) { - List keys = memberships.get(multicastAddress); - if (keys != null) { - Iterator keyIt = keys.iterator(); - - while (keyIt.hasNext()) { - MembershipKey key = keyIt.next(); - if (networkInterface.equals(key.networkInterface())) { - if (source == null && key.sourceAddress() == null || (source != null && source.equals(key.sourceAddress()))) { - key.drop(); - keyIt.remove(); - } - - } - } - if (keys.isEmpty()) { - memberships.remove(multicastAddress); + } + if (multicastAddress == null) { + throw new NullPointerException("multicastAddress"); + } + if (networkInterface == null) { + throw new NullPointerException("networkInterface"); + } + + synchronized (this) { + if (memberships != null) { + List keys = memberships.get(multicastAddress); + if (keys != null) { + Iterator keyIt = keys.iterator(); + + while (keyIt.hasNext()) { + MembershipKey key = keyIt.next(); + if (networkInterface.equals(key.networkInterface())) { + if (source == null && key.sourceAddress() == null || source != null && source.equals(key.sourceAddress())) { + key.drop(); + keyIt.remove(); + } } } + if (keys.isEmpty()) { + memberships.remove(multicastAddress); + } } } - return Channels.succeededFuture(this); } + + future.setSuccess(); + return future; } - + /** * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface - * */ - public ChannelFuture block(InetAddress multicastAddress, - NetworkInterface networkInterface, InetAddress sourceToBlock) { + @Override + public ChannelFuture block( + InetAddress multicastAddress, NetworkInterface networkInterface, + InetAddress sourceToBlock) { + return block(multicastAddress, networkInterface, sourceToBlock, newFuture()); + } + + /** + * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface + */ + @Override + public ChannelFuture block( + InetAddress multicastAddress, NetworkInterface networkInterface, + InetAddress sourceToBlock, ChannelFuture future) { if (DetectionUtil.javaVersion() < 7) { throw new UnsupportedOperationException(); } else { @@ -263,7 +399,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n if (sourceToBlock == null) { throw new NullPointerException("sourceToBlock"); } - + if (networkInterface == null) { throw new NullPointerException("networkInterface"); } @@ -275,39 +411,41 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n try { key.block(sourceToBlock); } catch (IOException e) { - return Channels.failedFuture(this, e); + future.setFailure(e); } } } } } - return Channels.succeededFuture(this); - - + future.setSuccess(); + return future; } } - + /** * Block the given sourceToBlock address for the given multicastAddress - * + * */ - public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) { - try { - block(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), sourceToBlock); - } catch (SocketException e) { - return Channels.failedFuture(this, e); - } - return Channels.succeededFuture(this); - - } - @Override - public ChannelFuture write(Object message, SocketAddress remoteAddress) { - if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) { - return super.write(message, null); - } else { - return super.write(message, remoteAddress); - } + public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) { + return block(multicastAddress, sourceToBlock, newFuture()); + } + /** + * Block the given sourceToBlock address for the given multicastAddress + * + */ + @Override + public ChannelFuture block( + InetAddress multicastAddress, InetAddress sourceToBlock, ChannelFuture future) { + try { + return block( + multicastAddress, + NetworkInterface.getByInetAddress(localAddress().getAddress()), + sourceToBlock, future); + } catch (SocketException e) { + future.setFailure(e); + } + return future; } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java deleted file mode 100644 index a22318eb50..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import static io.netty.channel.Channels.*; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; - -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelState; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.MessageEvent; - -/** - * Receives downstream events from a {@link ChannelPipeline}. It contains - * an array of I/O workers. - */ -class NioDatagramPipelineSink extends AbstractNioChannelSink { - - /** - * Handle downstream event. - * - * @param pipeline the {@link ChannelPipeline} that passes down the - * downstream event. - * @param e The downstream event. - */ - @Override - public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) - throws Exception { - final NioDatagramChannel channel = (NioDatagramChannel) e.getChannel(); - final ChannelFuture future = e.getFuture(); - if (e instanceof ChannelStateEvent) { - final ChannelStateEvent stateEvent = (ChannelStateEvent) e; - final ChannelState state = stateEvent.getState(); - final Object value = stateEvent.getValue(); - switch (state) { - case OPEN: - if (Boolean.FALSE.equals(value)) { - channel.getWorker().close(channel, future); - } - break; - case BOUND: - if (value != null) { - bind(channel, future, (InetSocketAddress) value); - } else { - channel.getWorker().close(channel, future); - } - break; - case CONNECTED: - if (value != null) { - connect(channel, future, (InetSocketAddress) value); - } else { - channel.getWorker().disconnect(channel, future); - } - break; - case INTEREST_OPS: - channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue()); - break; - } - } else if (e instanceof MessageEvent) { - final MessageEvent event = (MessageEvent) e; - final boolean offered = channel.writeBufferQueue.offer(event); - assert offered; - channel.getWorker().writeFromUserCode(channel); - } - } - - private void close(NioDatagramChannel channel, ChannelFuture future) { - try { - channel.getJdkChannel().closeSocket(); - if (channel.setClosed()) { - future.setSuccess(); - if (channel.isBound()) { - fireChannelUnbound(channel); - } - fireChannelClosed(channel); - } else { - future.setSuccess(); - } - } catch (final Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - - /** - * Will bind the DatagramSocket to the passed-in address. - * Every call bind will spawn a new thread using the that basically in turn - */ - private void bind(final NioDatagramChannel channel, - final ChannelFuture future, final InetSocketAddress address) { - boolean bound = false; - boolean started = false; - try { - // First bind the DatagramSocket the specified port. - channel.getJdkChannel().bind(address); - bound = true; - - future.setSuccess(); - fireChannelBound(channel, address); - - channel.getWorker().registerWithWorker(channel, null); - started = true; - } catch (final Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } finally { - if (!started && bound) { - close(channel, future); - } - } - } - - private void connect( - NioDatagramChannel channel, ChannelFuture future, - SocketAddress remoteAddress) { - - boolean bound = channel.isBound(); - boolean connected = false; - boolean workerStarted = false; - - future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - - // Clear the cached address so that the next getRemoteAddress() call - // updates the cache. - channel.remoteAddress = null; - - try { - channel.getJdkChannel().connect(remoteAddress); - connected = true; - - // Fire events. - future.setSuccess(); - if (!bound) { - fireChannelBound(channel, channel.getLocalAddress()); - } - fireChannelConnected(channel, channel.getRemoteAddress()); - - if (!bound) { - channel.getWorker().registerWithWorker(channel, future); - } - - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } finally { - if (connected && !workerStarted) { - channel.getWorker().close(channel, future); - } - } - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java deleted file mode 100644 index 63186549fd..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import static io.netty.channel.Channels.fireChannelDisconnected; -import static io.netty.channel.Channels.fireChannelDisconnectedLater; -import static io.netty.channel.Channels.fireExceptionCaught; -import static io.netty.channel.Channels.fireExceptionCaughtLater; -import static io.netty.channel.Channels.fireMessageReceived; -import static io.netty.channel.Channels.succeededFuture; -import io.netty.buffer.ChannelBufferFactory; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.Channels; -import io.netty.channel.MessageEvent; -import io.netty.channel.ReceiveBufferSizePredictor; -import io.netty.channel.socket.nio.SendBufferPool.SendBuffer; - -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.Queue; -import java.util.concurrent.Executor; - -/** - * A class responsible for registering channels with {@link Selector}. - * It also implements the {@link Selector} loop. - */ -public class NioDatagramWorker extends SingleThreadSelectorEventLoop { - - /** - * Sole constructor. - * - * @param executor the {@link Executor} used to execute {@link Runnable}s - * such as {@link ChannelRegistionTask} - */ - NioDatagramWorker(final Executor executor) { - super(executor); - } - - NioDatagramWorker(final Executor executor, boolean allowShutdownOnIdle) { - super(executor, allowShutdownOnIdle); - } - - @Override - protected boolean read(final SelectionKey key) { - final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); - ReceiveBufferSizePredictor predictor = - channel.getConfig().getReceiveBufferSizePredictor(); - final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory(); - final DatagramChannel nioChannel = (DatagramChannel) key.channel(); - - // Allocating a non-direct buffer with a max udp packge size. - // Would using a direct buffer be more efficient or would this negatively - // effect performance, as direct buffer allocation has a higher upfront cost - // where as a ByteBuffer is heap allocated. - final ByteBuffer byteBuffer = ByteBuffer.allocate( - predictor.nextReceiveBufferSize()).order(bufferFactory.getDefaultOrder()); - - boolean failure = true; - SocketAddress remoteAddress = null; - try { - // Receive from the channel in a non blocking mode. We have already been notified that - // the channel is ready to receive. - remoteAddress = nioChannel.receive(byteBuffer); - failure = false; - } catch (ClosedChannelException e) { - // Can happen, and does not need a user attention. - } catch (Throwable t) { - fireExceptionCaught(channel, t); - } - - if (remoteAddress != null) { - // Flip the buffer so that we can wrap it. - byteBuffer.flip(); - - int readBytes = byteBuffer.remaining(); - if (readBytes > 0) { - // Update the predictor. - predictor.previousReceiveBufferSize(readBytes); - - // Notify the interested parties about the newly arrived message. - fireMessageReceived( - channel, bufferFactory.getBuffer(byteBuffer), remoteAddress); - } - } - - if (failure) { - key.cancel(); // Some JDK implementations run into an infinite loop without this. - close(channel, succeededFuture(channel)); - return false; - } - - return true; - } - - void disconnect(NioDatagramChannel channel, ChannelFuture future) { - boolean connected = channel.isConnected(); - boolean iothread = isIoThread(); - try { - channel.getJdkChannel().disconnectSocket(); - future.setSuccess(); - if (connected) { - if (iothread) { - fireChannelDisconnected(channel); - } else { - fireChannelDisconnectedLater(channel); - } - } - } catch (Throwable t) { - future.setFailure(t); - if (iothread) { - fireExceptionCaught(channel, t); - } else { - fireExceptionCaughtLater(channel, t); - } - } - } - - - @Override - protected void registerTask(AbstractNioChannel channel, ChannelFuture future) { - final SocketAddress localAddress = channel.getLocalAddress(); - if (localAddress == null) { - if (future != null) { - future.setFailure(new ClosedChannelException()); - } - close(channel, succeededFuture(channel)); - return; - } - - try { - synchronized (channel.interestOpsLock) { - channel.getJdkChannel().register( - selector, channel.getRawInterestOps(), channel); - } - if (future != null) { - future.setSuccess(); - } - } catch (final ClosedChannelException e) { - if (future != null) { - future.setFailure(e); - } - close(channel, succeededFuture(channel)); - throw new ChannelException( - "Failed to register a socket to the selector.", e); - } - } - - @Override - public void writeFromUserCode(final AbstractNioChannel channel) { - /* - * Note that we are not checking if the channel is connected. Connected - * has a different meaning in UDP and means that the channels socket is - * configured to only send and receive from a given remote peer. - */ - if (!channel.isBound()) { - cleanUpWriteBuffer(channel); - return; - } - - if (scheduleWriteIfNecessary(channel)) { - return; - } - - // From here, we are sure Thread.currentThread() == workerThread. - - if (channel.writeSuspended) { - return; - } - - if (channel.inWriteNowLoop) { - return; - } - - write0(channel); - } - - @Override - protected void write0(final AbstractNioChannel channel) { - - boolean addOpWrite = false; - boolean removeOpWrite = false; - - long writtenBytes = 0; - - final SendBufferPool sendBufferPool = this.sendBufferPool; - final DatagramChannel ch = ((NioDatagramChannel) channel).getJdkChannel().getChannel(); - final Queue writeBuffer = channel.writeBufferQueue; - final int writeSpinCount = channel.getConfig().getWriteSpinCount(); - synchronized (channel.writeLock) { - // inform the channel that write is in-progress - channel.inWriteNowLoop = true; - - // loop forever... - for (;;) { - MessageEvent evt = channel.currentWriteEvent; - SendBuffer buf; - if (evt == null) { - if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { - removeOpWrite = true; - channel.writeSuspended = false; - break; - } - - channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); - } else { - buf = channel.currentWriteBuffer; - } - - try { - long localWrittenBytes = 0; - SocketAddress raddr = evt.getRemoteAddress(); - if (raddr == null) { - for (int i = writeSpinCount; i > 0; i --) { - localWrittenBytes = buf.transferTo(ch); - if (localWrittenBytes != 0) { - writtenBytes += localWrittenBytes; - break; - } - if (buf.finished()) { - break; - } - } - } else { - for (int i = writeSpinCount; i > 0; i --) { - localWrittenBytes = buf.transferTo(ch, raddr); - if (localWrittenBytes != 0) { - writtenBytes += localWrittenBytes; - break; - } - if (buf.finished()) { - break; - } - } - } - - if (localWrittenBytes > 0 || buf.finished()) { - // Successful write - proceed to the next message. - buf.release(); - ChannelFuture future = evt.getFuture(); - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; - evt = null; - buf = null; - future.setSuccess(); - } else { - // Not written at all - perhaps the kernel buffer is full. - addOpWrite = true; - channel.writeSuspended = true; - break; - } - } catch (final AsynchronousCloseException e) { - // Doesn't need a user attention - ignore. - } catch (final Throwable t) { - buf.release(); - ChannelFuture future = evt.getFuture(); - channel.currentWriteEvent = null; - channel.currentWriteBuffer = null; - buf = null; - evt = null; - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - channel.inWriteNowLoop = false; - - // Initially, the following block was executed after releasing - // the writeLock, but there was a race condition, and it has to be - // executed before releasing the writeLock: - // - // https://issues.jboss.org/browse/NETTY-410 - // - if (addOpWrite) { - setOpWrite(channel); - } else if (removeOpWrite) { - clearOpWrite(channel); - } - } - - Channels.fireWriteComplete(channel, writtenBytes); - } - - - -}