Implement NIO datagram transport with the new API

- AbstractChannel now handles flushing a message buffer
- Cleaned up DatagramChannel interface
- Removed ProtocolFamily because a user can create an NIO
  DatagramChannel and specify it as a constructor parameter
- UniqueName and UniqueKey constructors became public so that
  I don't need to create a subclass every time.
This commit is contained in:
Trustin Lee 2012-05-24 08:57:10 -07:00
parent cd11786994
commit c6f3b5762e
10 changed files with 450 additions and 674 deletions

View File

@ -11,11 +11,11 @@ public final class Signal extends Error {
private static final ConcurrentMap<String, Boolean> map =
new ConcurrentHashMap<String, Boolean>();
private final SignalName uname;
private final UniqueName uname;
public Signal(String name) {
super(name);
uname = new SignalName(name);
uname = new UniqueName(map, name);
}
public void expect(Signal signal) {
@ -38,10 +38,4 @@ public final class Signal extends Error {
public String toString() {
return uname.name();
}
private static class SignalName extends UniqueName {
protected SignalName(String name) {
super(map, name);
}
}
}

View File

@ -7,7 +7,7 @@ public class UniqueKey<T> extends UniqueName {
private final Class<T> valueType;
private final String strVal;
protected UniqueKey(ConcurrentMap<String, Boolean> map, String name, Class<T> valueType) {
public UniqueKey(ConcurrentMap<String, Boolean> map, String name, Class<T> valueType) {
super(map, name, valueType);
this.valueType = valueType;
strVal = name + '[' + valueType.getSimpleName() + ']';

View File

@ -10,7 +10,7 @@ public class UniqueName implements Comparable<UniqueName> {
private final int id;
private final String name;
protected UniqueName(ConcurrentMap<String, Boolean> map, String name, Object... args) {
public UniqueName(ConcurrentMap<String, Boolean> map, String name, Object... args) {
if (map == null) {
throw new NullPointerException("map");
}

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
@ -715,20 +716,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private void flush0() {
// Perform outbound I/O.
try {
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
int localFlushedAmount = doFlush(i == 0);
if (localFlushedAmount > 0) {
flushedAmount += localFlushedAmount;
notifyFlushFutures();
break;
}
if (out().isEmpty()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
if (out().hasByteBuffer()) {
out().byteBuffer().clear();
}
break;
}
ChannelBufferHolder<Object> out = out();
if (out.hasByteBuffer()) {
flushByteBuf(out.byteBuffer());
} else {
flushMessageBuf(out.messageBuffer());
}
} catch (Throwable t) {
notifyFlushFutures(t);
@ -741,6 +733,42 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private void flushByteBuf(ChannelBuffer buf) throws Exception {
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
int localFlushedAmount = doFlush(i == 0);
if (localFlushedAmount > 0) {
flushedAmount += localFlushedAmount;
notifyFlushFutures();
break;
}
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
break;
}
}
}
private void flushMessageBuf(Queue<Object> buf) throws Exception {
final int writeSpinCount = config().getWriteSpinCount() - 1;
while (!buf.isEmpty()) {
boolean wrote = false;
for (int i = writeSpinCount; i >= 0; i --) {
int localFlushedAmount = doFlush(i == 0);
if (localFlushedAmount > 0) {
flushedAmount += localFlushedAmount;
wrote = true;
notifyFlushFutures();
break;
}
}
if (!wrote) {
break;
}
}
}
private void notifyFlushFutures() {
FlushFutureEntry e = flushFuture;
if (e == null) {

View File

@ -38,22 +38,63 @@ public interface DatagramChannel extends Channel {
/**
* Joins a multicast group.
*/
void joinGroup(InetAddress multicastAddress, ChannelFuture future);
ChannelFuture joinGroup(InetAddress multicastAddress);
ChannelFuture joinGroup(InetAddress multicastAddress, ChannelFuture future);
/**
* Joins the specified multicast group at the specified interface.
*/
void joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface, ChannelFuture future);
ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface);
ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface, ChannelFuture future);
void joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source, ChannelFuture future);
ChannelFuture joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source);
ChannelFuture joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source, ChannelFuture future);
/**
* Leaves a multicast group.
*/
void leaveGroup(InetAddress multicastAddress, ChannelFuture future);
ChannelFuture leaveGroup(InetAddress multicastAddress);
ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelFuture future);
/**
* Leaves a multicast group on a specified local interface.
*/
void leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface, ChannelFuture future);
ChannelFuture leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface);
ChannelFuture leaveGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface, ChannelFuture future);
/**
* Leave the specified multicast group at the specified interface using the specified source.
*/
ChannelFuture leaveGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source);
ChannelFuture leaveGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
ChannelFuture future);
/**
* Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
*/
ChannelFuture block(
InetAddress multicastAddress, NetworkInterface networkInterface,
InetAddress sourceToBlock);
/**
* Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
*/
ChannelFuture block(
InetAddress multicastAddress, NetworkInterface networkInterface,
InetAddress sourceToBlock, ChannelFuture future);
/**
* Block the given sourceToBlock address for the given multicastAddress
*
*/
ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock);
/**
* Block the given sourceToBlock address for the given multicastAddress
*
*/
ChannelFuture block(
InetAddress multicastAddress, InetAddress sourceToBlock, ChannelFuture future);
}

View File

@ -0,0 +1,36 @@
package io.netty.channel.socket;
import io.netty.buffer.ChannelBuffer;
import java.net.InetSocketAddress;
public class DatagramPacket {
private final ChannelBuffer data;
private final InetSocketAddress remoteAddress;
public DatagramPacket(ChannelBuffer data, InetSocketAddress remoteAddress) {
if (data == null) {
throw new NullPointerException("data");
}
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
this.data = data;
this.remoteAddress = remoteAddress;
}
public ChannelBuffer data() {
return data;
}
public InetSocketAddress remoteAddress() {
return remoteAddress;
}
@Override
public String toString() {
return "datagram(" + data.readableBytes() + "B, " + remoteAddress + ')';
}
}

View File

@ -19,6 +19,7 @@ import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.net.InetSocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
@ -32,6 +33,16 @@ public abstract class AbstractNioChannel extends AbstractChannel {
this.ch = ch;
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
protected SelectableChannel javaChannel() {
return ch;

View File

@ -15,14 +15,15 @@
*/
package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.fireChannelOpen;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.Channels;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DetectionUtil;
import java.io.IOException;
@ -31,151 +32,257 @@ import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.MembershipKey;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
/**
* Provides an NIO based {@link io.netty.channel.socket.DatagramChannel}.
*/
public final class NioDatagramChannel extends AbstractNioChannel implements io.netty.channel.socket.DatagramChannel {
/**
* The supported ProtocolFamily by UDP
*
*/
public enum ProtocolFamily {
INET,
INET6
}
/**
* The {@link DatagramChannelConfig}.
*/
private final NioDatagramChannelConfig config;
private Map<InetAddress, List<MembershipKey>> memberships;
static NioDatagramChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, NioDatagramWorker worker, ProtocolFamily family) {
NioDatagramChannel instance =
new NioDatagramChannel(factory, pipeline, sink, worker, family);
fireChannelOpen(instance);
return instance;
}
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioDatagramChannel.class);
private NioDatagramChannel(final ChannelFactory factory,
final ChannelPipeline pipeline, final ChannelSink sink,
final NioDatagramWorker worker, ProtocolFamily family) {
super(null, factory, pipeline, sink, worker, new NioDatagramJdkChannel(openNonBlockingChannel(family)));
config = new DefaultNioDatagramChannelConfig(getJdkChannel().getChannel());
}
private final DatagramChannelConfig config;
private final Map<InetAddress, List<MembershipKey>> memberships =
new HashMap<InetAddress, List<MembershipKey>>();
private final ChannelBufferHolder<Object> out = ChannelBufferHolders.messageBuffer();
private static DatagramChannel openNonBlockingChannel(ProtocolFamily family) {
private static DatagramChannel newSocket() {
try {
final DatagramChannel channel;
// check if we are on java 7 or if the family was not specified
if (DetectionUtil.javaVersion() < 7 || family == null) {
channel = DatagramChannel.open();
} else {
// This block only works on java7++, but we checked before if we have it
switch (family) {
case INET:
channel = DatagramChannel.open(java.net.StandardProtocolFamily.INET);
break;
case INET6:
channel = DatagramChannel.open(java.net.StandardProtocolFamily.INET6);
break;
default:
throw new IllegalArgumentException();
}
}
channel.configureBlocking(false);
return channel;
} catch (final IOException e) {
throw new ChannelException("Failed to open a DatagramChannel.", e);
return DatagramChannel.open();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
public NioDatagramChannel() {
this(newSocket());
}
public NioDatagramChannel(DatagramChannel socket) {
this(null, socket);
}
public NioDatagramChannel(Integer id, DatagramChannel socket) {
super(null, id, socket);
try {
socket.configureBlocking(false);
} catch (IOException e) {
try {
socket.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
config = new DefaultNioDatagramChannelConfig(socket);
@Override
protected NioDatagramJdkChannel getJdkChannel() {
return (NioDatagramJdkChannel) super.getJdkChannel();
}
@Override
public NioDatagramWorker getWorker() {
return (NioDatagramWorker) super.getWorker();
}
@Override
public boolean isBound() {
return isOpen() && getJdkChannel().isSocketBound();
}
@Override
public boolean isConnected() {
return getJdkChannel().isConnected();
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
@Override
public NioDatagramChannelConfig getConfig() {
public DatagramChannelConfig config() {
return config;
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress) {
try {
return joinGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
} catch (SocketException e) {
return Channels.failedFuture(this, e);
public boolean isActive() {
DatagramChannel ch = javaChannel();
return ch.isOpen() && ch.socket().isBound();
}
@Override
protected DatagramChannel javaChannel() {
return (DatagramChannel) super.javaChannel();
}
@Override
protected ChannelBufferHolder<Object> firstOut() {
return out;
}
@Override
protected SocketAddress localAddress0() {
return javaChannel().socket().getLocalSocketAddress();
}
@Override
protected SocketAddress remoteAddress0() {
return javaChannel().socket().getRemoteSocketAddress();
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress);
selectionKey().interestOps(SelectionKey.OP_READ);
}
@Override
protected boolean doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) throws Exception {
if (localAddress != null) {
javaChannel().socket().bind(localAddress);
}
boolean success = false;
try {
javaChannel().connect(remoteAddress);
selectionKey().interestOps(selectionKey().interestOps() | SelectionKey.OP_READ);
success = true;
return true;
} finally {
if (!success) {
doClose();
}
}
}
@Override
public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
return joinGroup(multicastAddress.getAddress(), networkInterface, null);
protected void doFinishConnect() throws Exception {
throw new Error();
}
/**
* Joins the specified multicast group at the specified interface using the specified source.
*/
public ChannelFuture joinGroup(InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
@Override
protected void doDisconnect() throws Exception {
javaChannel().disconnect();
}
@Override
protected void doClose() throws Exception {
javaChannel().close();
}
@Override
protected void doDeregister() throws Exception {
selectionKey().cancel();
((SingleThreadSelectorEventLoop) eventLoop()).cancelledKeys ++;
}
@Override
protected int doRead(ChannelBufferHolder<Object> buf) throws Exception {
DatagramChannel ch = javaChannel();
ByteBuffer data = ByteBuffer.allocate(1024);
InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(data);
if (remoteAddress == null) {
return 0;
}
data.flip();
buf.messageBuffer().add(new DatagramPacket(ChannelBuffers.wrappedBuffer(data), remoteAddress));
return 1;
}
@Override
protected int doFlush(boolean lastSpin) throws Exception {
final Queue<Object> buf = unsafe().out().messageBuffer();
if (buf.isEmpty()) {
return 0;
}
DatagramPacket packet = (DatagramPacket) buf.peek();
final int writtenBytes = javaChannel().send(packet.data().toByteBuffer(), packet.remoteAddress());
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
if (writtenBytes <= 0) {
// Did not write a packet.
// 1) If 'lastSpin' is false, the caller will call this method again real soon.
// - Do not update OP_WRITE.
// 2) If 'lastSpin' is true, the caller will not retry.
// - Set OP_WRITE so that the event loop calls flushForcibly() later.
if (lastSpin) {
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
return 0;
}
// Wrote a packet.
buf.remove();
if (buf.isEmpty()) {
// Wrote the outbound buffer completely - clear OP_WRITE.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
}
return 1;
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress) {
return joinGroup(multicastAddress, newFuture());
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelFuture future) {
try {
return joinGroup(
multicastAddress,
NetworkInterface.getByInetAddress(localAddress().getAddress()),
null, future);
} catch (SocketException e) {
future.setFailure(e);
}
return future;
}
@Override
public ChannelFuture joinGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
return joinGroup(multicastAddress, networkInterface, newFuture());
}
@Override
public ChannelFuture joinGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface,
ChannelFuture future) {
return joinGroup(multicastAddress.getAddress(), networkInterface, null, future);
}
@Override
public ChannelFuture joinGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
return joinGroup(multicastAddress, networkInterface, source, newFuture());
}
@Override
public ChannelFuture joinGroup(
InetAddress multicastAddress, NetworkInterface networkInterface,
InetAddress source, ChannelFuture future) {
if (DetectionUtil.javaVersion() < 7) {
throw new UnsupportedOperationException();
} else {
if (multicastAddress == null) {
throw new NullPointerException("multicastAddress");
}
if (networkInterface == null) {
throw new NullPointerException("networkInterface");
}
try {
MembershipKey key;
if (source == null) {
key = getJdkChannel().getChannel().join(multicastAddress, networkInterface);
key = javaChannel().join(multicastAddress, networkInterface);
} else {
key = getJdkChannel().getChannel().join(multicastAddress, networkInterface, source);
key = javaChannel().join(multicastAddress, networkInterface, source);
}
synchronized (this) {
if (memberships == null) {
memberships = new HashMap<InetAddress, List<MembershipKey>>();
}
List<MembershipKey> keys = memberships.get(multicastAddress);
if (keys == null) {
keys = new ArrayList<MembershipKey>();
@ -183,77 +290,106 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
}
keys.add(key);
}
future.setSuccess();
} catch (Throwable e) {
return Channels.failedFuture(this, e);
future.setFailure(e);
}
}
return Channels.succeededFuture(this);
return future;
}
@Override
public ChannelFuture leaveGroup(InetAddress multicastAddress) {
try {
return leaveGroup(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
} catch (SocketException e) {
return Channels.failedFuture(this, e);
}
return leaveGroup(multicastAddress, newFuture());
}
@Override
public ChannelFuture leaveGroup(InetSocketAddress multicastAddress,
NetworkInterface networkInterface) {
return leaveGroup(multicastAddress.getAddress(), networkInterface, null);
public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelFuture future) {
try {
return leaveGroup(multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, future);
} catch (SocketException e) {
future.setFailure(e);
}
return future;
}
/**
* Leave the specified multicast group at the specified interface using the specified source.
*/
public ChannelFuture leaveGroup(InetAddress multicastAddress,
NetworkInterface networkInterface, InetAddress source) {
@Override
public ChannelFuture leaveGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
return leaveGroup(multicastAddress, networkInterface, newFuture());
}
@Override
public ChannelFuture leaveGroup(
InetSocketAddress multicastAddress,
NetworkInterface networkInterface, ChannelFuture future) {
return leaveGroup(multicastAddress.getAddress(), networkInterface, null, future);
}
@Override
public ChannelFuture leaveGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
return leaveGroup(multicastAddress, networkInterface, source, newFuture());
}
@Override
public ChannelFuture leaveGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
ChannelFuture future) {
if (DetectionUtil.javaVersion() < 7) {
throw new UnsupportedOperationException();
} else {
if (multicastAddress == null) {
throw new NullPointerException("multicastAddress");
}
if (networkInterface == null) {
throw new NullPointerException("networkInterface");
}
synchronized (this) {
if (memberships != null) {
List<MembershipKey> keys = memberships.get(multicastAddress);
if (keys != null) {
Iterator<MembershipKey> keyIt = keys.iterator();
while (keyIt.hasNext()) {
MembershipKey key = keyIt.next();
if (networkInterface.equals(key.networkInterface())) {
if (source == null && key.sourceAddress() == null || (source != null && source.equals(key.sourceAddress()))) {
key.drop();
keyIt.remove();
}
}
}
if (keys.isEmpty()) {
memberships.remove(multicastAddress);
}
if (multicastAddress == null) {
throw new NullPointerException("multicastAddress");
}
if (networkInterface == null) {
throw new NullPointerException("networkInterface");
}
synchronized (this) {
if (memberships != null) {
List<MembershipKey> keys = memberships.get(multicastAddress);
if (keys != null) {
Iterator<MembershipKey> keyIt = keys.iterator();
while (keyIt.hasNext()) {
MembershipKey key = keyIt.next();
if (networkInterface.equals(key.networkInterface())) {
if (source == null && key.sourceAddress() == null || source != null && source.equals(key.sourceAddress())) {
key.drop();
keyIt.remove();
}
}
}
if (keys.isEmpty()) {
memberships.remove(multicastAddress);
}
}
}
return Channels.succeededFuture(this);
}
future.setSuccess();
return future;
}
/**
* Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
*
*/
public ChannelFuture block(InetAddress multicastAddress,
NetworkInterface networkInterface, InetAddress sourceToBlock) {
@Override
public ChannelFuture block(
InetAddress multicastAddress, NetworkInterface networkInterface,
InetAddress sourceToBlock) {
return block(multicastAddress, networkInterface, sourceToBlock, newFuture());
}
/**
* Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
*/
@Override
public ChannelFuture block(
InetAddress multicastAddress, NetworkInterface networkInterface,
InetAddress sourceToBlock, ChannelFuture future) {
if (DetectionUtil.javaVersion() < 7) {
throw new UnsupportedOperationException();
} else {
@ -263,7 +399,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
if (sourceToBlock == null) {
throw new NullPointerException("sourceToBlock");
}
if (networkInterface == null) {
throw new NullPointerException("networkInterface");
}
@ -275,39 +411,41 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
try {
key.block(sourceToBlock);
} catch (IOException e) {
return Channels.failedFuture(this, e);
future.setFailure(e);
}
}
}
}
}
return Channels.succeededFuture(this);
future.setSuccess();
return future;
}
}
/**
* Block the given sourceToBlock address for the given multicastAddress
*
*
*/
public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
try {
block(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), sourceToBlock);
} catch (SocketException e) {
return Channels.failedFuture(this, e);
}
return Channels.succeededFuture(this);
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
} else {
return super.write(message, remoteAddress);
}
public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
return block(multicastAddress, sourceToBlock, newFuture());
}
/**
* Block the given sourceToBlock address for the given multicastAddress
*
*/
@Override
public ChannelFuture block(
InetAddress multicastAddress, InetAddress sourceToBlock, ChannelFuture future) {
try {
return block(
multicastAddress,
NetworkInterface.getByInetAddress(localAddress().getAddress()),
sourceToBlock, future);
} catch (SocketException e) {
future.setFailure(e);
}
return future;
}
}

View File

@ -1,169 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.*;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent;
/**
* Receives downstream events from a {@link ChannelPipeline}. It contains
* an array of I/O workers.
*/
class NioDatagramPipelineSink extends AbstractNioChannelSink {
/**
* Handle downstream event.
*
* @param pipeline the {@link ChannelPipeline} that passes down the
* downstream event.
* @param e The downstream event.
*/
@Override
public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e)
throws Exception {
final NioDatagramChannel channel = (NioDatagramChannel) e.getChannel();
final ChannelFuture future = e.getFuture();
if (e instanceof ChannelStateEvent) {
final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
final ChannelState state = stateEvent.getState();
final Object value = stateEvent.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.getWorker().close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (InetSocketAddress) value);
} else {
channel.getWorker().close(channel, future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (InetSocketAddress) value);
} else {
channel.getWorker().disconnect(channel, future);
}
break;
case INTEREST_OPS:
channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
final MessageEvent event = (MessageEvent) e;
final boolean offered = channel.writeBufferQueue.offer(event);
assert offered;
channel.getWorker().writeFromUserCode(channel);
}
}
private void close(NioDatagramChannel channel, ChannelFuture future) {
try {
channel.getJdkChannel().closeSocket();
if (channel.setClosed()) {
future.setSuccess();
if (channel.isBound()) {
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
} else {
future.setSuccess();
}
} catch (final Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
/**
* Will bind the DatagramSocket to the passed-in address.
* Every call bind will spawn a new thread using the that basically in turn
*/
private void bind(final NioDatagramChannel channel,
final ChannelFuture future, final InetSocketAddress address) {
boolean bound = false;
boolean started = false;
try {
// First bind the DatagramSocket the specified port.
channel.getJdkChannel().bind(address);
bound = true;
future.setSuccess();
fireChannelBound(channel, address);
channel.getWorker().registerWithWorker(channel, null);
started = true;
} catch (final Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!started && bound) {
close(channel, future);
}
}
}
private void connect(
NioDatagramChannel channel, ChannelFuture future,
SocketAddress remoteAddress) {
boolean bound = channel.isBound();
boolean connected = false;
boolean workerStarted = false;
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
// Clear the cached address so that the next getRemoteAddress() call
// updates the cache.
channel.remoteAddress = null;
try {
channel.getJdkChannel().connect(remoteAddress);
connected = true;
// Fire events.
future.setSuccess();
if (!bound) {
fireChannelBound(channel, channel.getLocalAddress());
}
fireChannelConnected(channel, channel.getRemoteAddress());
if (!bound) {
channel.getWorker().registerWithWorker(channel, future);
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (connected && !workerStarted) {
channel.getWorker().close(channel, future);
}
}
}
}

View File

@ -1,303 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.fireChannelDisconnected;
import static io.netty.channel.Channels.fireChannelDisconnectedLater;
import static io.netty.channel.Channels.fireExceptionCaught;
import static io.netty.channel.Channels.fireExceptionCaughtLater;
import static io.netty.channel.Channels.fireMessageReceived;
import static io.netty.channel.Channels.succeededFuture;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
import io.netty.channel.ReceiveBufferSizePredictor;
import io.netty.channel.socket.nio.SendBufferPool.SendBuffer;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.Executor;
/**
* A class responsible for registering channels with {@link Selector}.
* It also implements the {@link Selector} loop.
*/
public class NioDatagramWorker extends SingleThreadSelectorEventLoop {
/**
* Sole constructor.
*
* @param executor the {@link Executor} used to execute {@link Runnable}s
* such as {@link ChannelRegistionTask}
*/
NioDatagramWorker(final Executor executor) {
super(executor);
}
NioDatagramWorker(final Executor executor, boolean allowShutdownOnIdle) {
super(executor, allowShutdownOnIdle);
}
@Override
protected boolean read(final SelectionKey key) {
final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
final DatagramChannel nioChannel = (DatagramChannel) key.channel();
// Allocating a non-direct buffer with a max udp packge size.
// Would using a direct buffer be more efficient or would this negatively
// effect performance, as direct buffer allocation has a higher upfront cost
// where as a ByteBuffer is heap allocated.
final ByteBuffer byteBuffer = ByteBuffer.allocate(
predictor.nextReceiveBufferSize()).order(bufferFactory.getDefaultOrder());
boolean failure = true;
SocketAddress remoteAddress = null;
try {
// Receive from the channel in a non blocking mode. We have already been notified that
// the channel is ready to receive.
remoteAddress = nioChannel.receive(byteBuffer);
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (remoteAddress != null) {
// Flip the buffer so that we can wrap it.
byteBuffer.flip();
int readBytes = byteBuffer.remaining();
if (readBytes > 0) {
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Notify the interested parties about the newly arrived message.
fireMessageReceived(
channel, bufferFactory.getBuffer(byteBuffer), remoteAddress);
}
}
if (failure) {
key.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
return false;
}
return true;
}
void disconnect(NioDatagramChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean iothread = isIoThread();
try {
channel.getJdkChannel().disconnectSocket();
future.setSuccess();
if (connected) {
if (iothread) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
@Override
protected void registerTask(AbstractNioChannel channel, ChannelFuture future) {
final SocketAddress localAddress = channel.getLocalAddress();
if (localAddress == null) {
if (future != null) {
future.setFailure(new ClosedChannelException());
}
close(channel, succeededFuture(channel));
return;
}
try {
synchronized (channel.interestOpsLock) {
channel.getJdkChannel().register(
selector, channel.getRawInterestOps(), channel);
}
if (future != null) {
future.setSuccess();
}
} catch (final ClosedChannelException e) {
if (future != null) {
future.setFailure(e);
}
close(channel, succeededFuture(channel));
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}
}
@Override
public void writeFromUserCode(final AbstractNioChannel channel) {
/*
* Note that we are not checking if the channel is connected. Connected
* has a different meaning in UDP and means that the channels socket is
* configured to only send and receive from a given remote peer.
*/
if (!channel.isBound()) {
cleanUpWriteBuffer(channel);
return;
}
if (scheduleWriteIfNecessary(channel)) {
return;
}
// From here, we are sure Thread.currentThread() == workerThread.
if (channel.writeSuspended) {
return;
}
if (channel.inWriteNowLoop) {
return;
}
write0(channel);
}
@Override
protected void write0(final AbstractNioChannel channel) {
boolean addOpWrite = false;
boolean removeOpWrite = false;
long writtenBytes = 0;
final SendBufferPool sendBufferPool = this.sendBufferPool;
final DatagramChannel ch = ((NioDatagramChannel) channel).getJdkChannel().getChannel();
final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
// inform the channel that write is in-progress
channel.inWriteNowLoop = true;
// loop forever...
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf;
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
buf = channel.currentWriteBuffer;
}
try {
long localWrittenBytes = 0;
SocketAddress raddr = evt.getRemoteAddress();
if (raddr == null) {
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
} else {
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch, raddr);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
}
if (localWrittenBytes > 0 || buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written at all - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
break;
}
} catch (final AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (final Throwable t) {
buf.release();
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
buf = null;
evt = null;
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
channel.inWriteNowLoop = false;
// Initially, the following block was executed after releasing
// the writeLock, but there was a race condition, and it has to be
// executed before releasing the writeLock:
//
// https://issues.jboss.org/browse/NETTY-410
//
if (addOpWrite) {
setOpWrite(channel);
} else if (removeOpWrite) {
clearOpWrite(channel);
}
}
Channels.fireWriteComplete(channel, writtenBytes);
}
}