diff --git a/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java b/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java index e7cfca947f..29407de301 100644 --- a/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java +++ b/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java @@ -25,12 +25,13 @@ import io.netty.example.udt.util.UtilConsoleReporter; import io.netty.example.udt.util.UtilThreadFactory; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * UDT Byte Stream Client *

@@ -62,7 +63,7 @@ public class ByteEchoClient { final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER); try { - boot.group(connectGroup) + boot.group(connectGroup).localAddress(0) .channelFactory(NioUdtProvider.BYTE_CONNECTOR) .handler(new ChannelInitializer() { @Override diff --git a/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java b/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java index 127c194a23..d9ac29b323 100644 --- a/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java +++ b/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java @@ -25,12 +25,13 @@ import io.netty.example.udt.util.UtilConsoleReporter; import io.netty.example.udt.util.UtilThreadFactory; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * UDT Message Flow client *

@@ -62,7 +63,7 @@ public class MsgEchoClient { final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, connectFactory, NioUdtProvider.MESSAGE_PROVIDER); try { - boot.group(connectGroup) + boot.group(connectGroup).localAddress(0) .channelFactory(NioUdtProvider.MESSAGE_CONNECTOR) .handler(new ChannelInitializer() { @Override diff --git a/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtChannelConfig.java b/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtChannelConfig.java index 681a876415..044e2453cd 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtChannelConfig.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtChannelConfig.java @@ -44,7 +44,6 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements private volatile int allocatorReceiveBufferSize = 128 * K; private volatile int allocatorSendBufferSize = 128 * K; - private volatile int backlog = 64; private volatile int soLinger; private volatile boolean reuseAddress = true; @@ -82,11 +81,6 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements return protocolReceiveBuferSize; } - @Override - public int getBacklog() { - return backlog; - } - @SuppressWarnings("unchecked") @Override public T getOption(final ChannelOption option) { @@ -114,9 +108,6 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements if (option == SO_LINGER) { return (T) Integer.valueOf(getSoLinger()); } - if (option == SO_BACKLOG) { - return (T) Integer.valueOf(getBacklog()); - } return super.getOption(option); } @@ -125,7 +116,7 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements return getOptions(super.getOptions(), PROTOCOL_RECEIVE_BUFFER_SIZE, PROTOCOL_SEND_BUFFER_SIZE, SYSTEM_RECEIVE_BUFFER_SIZE, SYSTEM_SEND_BUFFER_SIZE, SO_RCVBUF, SO_SNDBUF, SO_REUSEADDR, - SO_LINGER, SO_BACKLOG); + SO_LINGER); } @Override @@ -154,12 +145,6 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements return this; } - @Override - public UdtChannelConfig setBacklog(final int backlog) { - this.backlog = backlog; - return this; - } - @Override public boolean setOption(final ChannelOption option, final T value) { validate(option, value); @@ -179,8 +164,6 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements setReuseAddress((Boolean) value); } else if (option == SO_LINGER) { setSoLinger((Integer) value); - } else if (option == SO_BACKLOG) { - setBacklog((Integer) value); } else { return super.setOption(option, value); } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtServerChannelConfig.java b/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtServerChannelConfig.java new file mode 100644 index 0000000000..f20b309bc7 --- /dev/null +++ b/transport-udt/src/main/java/io/netty/channel/udt/DefaultUdtServerChannelConfig.java @@ -0,0 +1,139 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.udt; + +import static io.netty.channel.ChannelOption.*; +import io.netty.channel.ChannelOption; + +import java.io.IOException; +import java.util.Map; + +import com.barchart.udt.SocketUDT; +import com.barchart.udt.nio.ChannelUDT; + +/** + * The default {@link UdtServerChannelConfig} implementation. + */ +public class DefaultUdtServerChannelConfig extends DefaultUdtChannelConfig + implements UdtServerChannelConfig { + + private volatile int backlog = 64; + + public DefaultUdtServerChannelConfig(final UdtChannel channel, + final ChannelUDT channelUDT, final boolean apply) + throws IOException { + super(channel, channelUDT, apply); + if (apply) { + apply(channelUDT); + } + } + + @Override + protected void apply(final ChannelUDT channelUDT) throws IOException { + final SocketUDT socketUDT = channelUDT.socketUDT(); + // nothing to apply for now. + } + + @Override + public int getBacklog() { + return backlog; + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(final ChannelOption option) { + if (option == SO_BACKLOG) { + return (T) Integer.valueOf(getBacklog()); + } + return super.getOption(option); + } + + @Override + public Map, Object> getOptions() { + return getOptions(super.getOptions(), SO_BACKLOG); + } + + @Override + public UdtServerChannelConfig setBacklog(final int backlog) { + this.backlog = backlog; + return this; + } + + @Override + public boolean setOption(final ChannelOption option, final T value) { + validate(option, value); + if (option == SO_BACKLOG) { + setBacklog((Integer) value); + } else { + return super.setOption(option, value); + } + return true; + } + + @Override + public UdtServerChannelConfig setProtocolReceiveBufferSize( + final int protocolReceiveBuferSize) { + super.setProtocolReceiveBufferSize(protocolReceiveBuferSize); + return this; + } + + @Override + public UdtServerChannelConfig setProtocolSendBufferSize( + final int protocolSendBuferSize) { + super.setProtocolSendBufferSize(protocolSendBuferSize); + return this; + } + + @Override + public UdtServerChannelConfig setReceiveBufferSize( + final int receiveBufferSize) { + super.setReceiveBufferSize(receiveBufferSize); + return this; + } + + @Override + public UdtServerChannelConfig setReuseAddress(final boolean reuseAddress) { + super.setReuseAddress(reuseAddress); + return this; + } + + @Override + public UdtServerChannelConfig setSendBufferSize(final int sendBufferSize) { + super.setSendBufferSize(sendBufferSize); + return this; + } + + @Override + public UdtServerChannelConfig setSoLinger(final int soLinger) { + super.setSoLinger(soLinger); + return this; + } + + @Override + public UdtServerChannelConfig setSystemReceiveBufferSize( + final int systemSendBuferSize) { + super.setSystemReceiveBufferSize(systemSendBuferSize); + return this; + } + + @Override + public UdtServerChannelConfig setSystemSendBufferSize( + final int systemReceiveBufferSize) { + super.setSystemSendBufferSize(systemReceiveBufferSize); + return this; + } + +} diff --git a/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelConfig.java b/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelConfig.java index 71264f8293..97fe96582b 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelConfig.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/UdtChannelConfig.java @@ -71,11 +71,6 @@ import com.barchart.udt.nio.KindUDT; */ public interface UdtChannelConfig extends ChannelConfig { - /** - * Gets {@link KindUDT#ACCEPTOR} channel backlog. - */ - int getBacklog(); - /** * Gets {@link OptionUDT#Protocol_Receive_Buffer_Size} */ @@ -116,11 +111,6 @@ public interface UdtChannelConfig extends ChannelConfig { */ boolean isReuseAddress(); - /** - * Sets {@link KindUDT#ACCEPTOR} channel backlog. - */ - UdtChannelConfig setBacklog(int backlog); - /** * Sets {@link OptionUDT#Protocol_Receive_Buffer_Size} */ diff --git a/transport-udt/src/main/java/io/netty/channel/udt/UdtServerChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/UdtServerChannel.java new file mode 100644 index 0000000000..b0b9532be7 --- /dev/null +++ b/transport-udt/src/main/java/io/netty/channel/udt/UdtServerChannel.java @@ -0,0 +1,30 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.udt; + +import io.netty.channel.ServerChannel; +import io.netty.channel.udt.nio.NioUdtProvider; + +import java.net.InetSocketAddress; + +/** + * UDT {@link ServerChannel}. + *

+ * Supported UDT {@link UdtServerChannel} are available via {@link NioUdtProvider}. + */ +public interface UdtServerChannel extends ServerChannel, UdtChannel { + +} diff --git a/transport-udt/src/main/java/io/netty/channel/udt/UdtServerChannelConfig.java b/transport-udt/src/main/java/io/netty/channel/udt/UdtServerChannelConfig.java new file mode 100644 index 0000000000..764e82b1f8 --- /dev/null +++ b/transport-udt/src/main/java/io/netty/channel/udt/UdtServerChannelConfig.java @@ -0,0 +1,71 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.udt; + +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOption; + +import com.barchart.udt.TypeUDT; +import com.barchart.udt.nio.KindUDT; + +/** + * A {@link ChannelConfig} for a {@link UdtServerChannel}. + *

+ * Note that {@link TypeUDT#DATAGRAM} message oriented channels treat + * {@code "receiveBufferSize"} and {@code "sendBufferSize"} as maximum message + * size. If received or sent message does not fit specified sizes, + * {@link ChannelException} will be thrown. + */ +public interface UdtServerChannelConfig extends UdtChannelConfig { + + /** + * Gets {@link KindUDT#ACCEPTOR} channel backlog via + * {@link ChannelOption#SO_BACKLOG}. + */ + int getBacklog(); + + /** + * Sets {@link KindUDT#ACCEPTOR} channel backlog via + * {@link ChannelOption#SO_BACKLOG}. + */ + UdtServerChannelConfig setBacklog(int backlog); + + @Override + UdtServerChannelConfig setProtocolReceiveBufferSize(int size); + + @Override + UdtServerChannelConfig setProtocolSendBufferSize(int size); + + @Override + UdtServerChannelConfig setReceiveBufferSize(int receiveBufferSize); + + @Override + UdtServerChannelConfig setReuseAddress(boolean reuseAddress); + + @Override + UdtServerChannelConfig setSendBufferSize(int sendBufferSize); + + @Override + UdtServerChannelConfig setSoLinger(int soLinger); + + @Override + UdtServerChannelConfig setSystemReceiveBufferSize(int size); + + @Override + UdtServerChannelConfig setSystemSendBufferSize(int size); + +} diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java index 6c2d572bcb..4d779b5105 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java @@ -21,9 +21,10 @@ import io.netty.channel.ChannelException; import io.netty.channel.socket.nio.AbstractNioMessageChannel; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; -import io.netty.channel.udt.DefaultUdtChannelConfig; -import io.netty.channel.udt.UdtChannel; +import io.netty.channel.udt.DefaultUdtServerChannelConfig; +import io.netty.channel.udt.UdtServerChannel; import io.netty.channel.udt.UdtChannelConfig; +import io.netty.channel.udt.UdtServerChannelConfig; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -35,18 +36,18 @@ import com.barchart.udt.nio.ServerSocketChannelUDT; * Common base for Netty Byte/Message UDT Stream/Datagram acceptors. */ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel - implements UdtChannel { + implements UdtServerChannel { protected static final InternalLogger logger = InternalLoggerFactory .getInstance(NioUdtAcceptorChannel.class); - private final UdtChannelConfig config; + private final UdtServerChannelConfig config; protected NioUdtAcceptorChannel(final ServerSocketChannelUDT channelUDT) { super(null, channelUDT.socketUDT().id(), channelUDT, OP_ACCEPT); try { channelUDT.configureBlocking(false); - config = new DefaultUdtChannelConfig(this, channelUDT, true); + config = new DefaultUdtServerChannelConfig(this, channelUDT, true); } catch (final Exception e) { try { channelUDT.close(); @@ -55,7 +56,7 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel logger.warn("Failed to close channel.", e2); } } - throw new ChannelException("Failed configure channel.", e); + throw new ChannelException("Failed to configure channel.", e); } } @@ -64,7 +65,7 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel } @Override - public UdtChannelConfig config() { + public UdtServerChannelConfig config() { return config; } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java index ae36e31f97..d0647dd9d3 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtByteRendezvousChannel.java @@ -21,7 +21,9 @@ import com.barchart.udt.TypeUDT; * Byte Channel Rendezvous for UDT Streams. */ public class NioUdtByteRendezvousChannel extends NioUdtByteConnectorChannel { + public NioUdtByteRendezvousChannel() { super(NioUdtProvider.newRendezvousChannelUDT(TypeUDT.STREAM)); } + } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java index 9333c28cfe..c2c8d00172 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtProvider.java @@ -26,6 +26,7 @@ import com.barchart.udt.nio.SocketChannelUDT; import io.netty.bootstrap.ChannelFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelException; +import io.netty.channel.udt.UdtServerChannel; import io.netty.channel.udt.UdtChannel; import java.io.IOException; @@ -38,20 +39,20 @@ import java.nio.channels.spi.SelectorProvider; *

* Provides {@link SelectorProvider} for UDT channels. */ -public final class NioUdtProvider implements ChannelFactory { +public final class NioUdtProvider implements ChannelFactory { /** * {@link ChannelFactory} for UDT Byte Acceptor. See {@link TypeUDT#STREAM} * and {@link KindUDT#ACCEPTOR}. */ - public static final ChannelFactory BYTE_ACCEPTOR = new NioUdtProvider( + public static final ChannelFactory BYTE_ACCEPTOR = new NioUdtProvider( TypeUDT.STREAM, KindUDT.ACCEPTOR); /** * {@link ChannelFactory} for UDT Byte Connector. See {@link TypeUDT#STREAM} * and {@link KindUDT#CONNECTOR}. */ - public static final ChannelFactory BYTE_CONNECTOR = new NioUdtProvider( + public static final ChannelFactory BYTE_CONNECTOR = new NioUdtProvider( TypeUDT.STREAM, KindUDT.CONNECTOR); /** @@ -64,21 +65,21 @@ public final class NioUdtProvider implements ChannelFactory { * {@link ChannelFactory} for UDT Byte Rendezvous. See * {@link TypeUDT#STREAM} and {@link KindUDT#RENDEZVOUS}. */ - public static final ChannelFactory BYTE_RENDEZVOUS = new NioUdtProvider( + public static final ChannelFactory BYTE_RENDEZVOUS = new NioUdtProvider( TypeUDT.STREAM, KindUDT.RENDEZVOUS); /** * {@link ChannelFactory} for UDT Message Acceptor. See * {@link TypeUDT#DATAGRAM} and {@link KindUDT#ACCEPTOR}. */ - public static final ChannelFactory MESSAGE_ACCEPTOR = new NioUdtProvider( + public static final ChannelFactory MESSAGE_ACCEPTOR = new NioUdtProvider( TypeUDT.DATAGRAM, KindUDT.ACCEPTOR); /** * {@link ChannelFactory} for UDT Message Connector. See * {@link TypeUDT#DATAGRAM} and {@link KindUDT#CONNECTOR}. */ - public static final ChannelFactory MESSAGE_CONNECTOR = new NioUdtProvider( + public static final ChannelFactory MESSAGE_CONNECTOR = new NioUdtProvider( TypeUDT.DATAGRAM, KindUDT.CONNECTOR); /** @@ -91,7 +92,7 @@ public final class NioUdtProvider implements ChannelFactory { * {@link ChannelFactory} for UDT Message Rendezvous. See * {@link TypeUDT#DATAGRAM} and {@link KindUDT#RENDEZVOUS}. */ - public static final ChannelFactory MESSAGE_RENDEZVOUS = new NioUdtProvider( + public static final ChannelFactory MESSAGE_RENDEZVOUS = new NioUdtProvider( TypeUDT.DATAGRAM, KindUDT.RENDEZVOUS); /** @@ -196,33 +197,34 @@ public final class NioUdtProvider implements ChannelFactory { * Produce new {@link UdtChannel} based on factory {@link #kind()} and * {@link #type()} */ + @SuppressWarnings("unchecked") @Override - public UdtChannel newChannel() { + public T newChannel() { switch (kind) { case ACCEPTOR: switch (type) { case DATAGRAM: - return new NioUdtMessageAcceptorChannel(); + return (T) new NioUdtMessageAcceptorChannel(); case STREAM: - return new NioUdtByteAcceptorChannel(); + return (T) new NioUdtByteAcceptorChannel(); default: throw new IllegalStateException("wrong type=" + type); } case CONNECTOR: switch (type) { case DATAGRAM: - return new NioUdtMessageConnectorChannel(); + return (T) new NioUdtMessageConnectorChannel(); case STREAM: - return new NioUdtByteConnectorChannel(); + return (T) new NioUdtByteConnectorChannel(); default: throw new IllegalStateException("wrong type=" + type); } case RENDEZVOUS: switch (type) { case DATAGRAM: - return new NioUdtMessageRendezvousChannel(); + return (T) new NioUdtMessageRendezvousChannel(); case STREAM: - return new NioUdtByteRendezvousChannel(); + return (T) new NioUdtByteRendezvousChannel(); default: throw new IllegalStateException("wrong type=" + type); } diff --git a/transport-udt/src/test/java/io/netty/channel/udt/nio/NioUdtProviderTest.java b/transport-udt/src/test/java/io/netty/channel/udt/nio/NioUdtProviderTest.java index 842e829993..aa3f9e8378 100644 --- a/transport-udt/src/test/java/io/netty/channel/udt/nio/NioUdtProviderTest.java +++ b/transport-udt/src/test/java/io/netty/channel/udt/nio/NioUdtProviderTest.java @@ -16,6 +16,8 @@ package io.netty.channel.udt.nio; +import io.netty.channel.udt.UdtServerChannel; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +43,10 @@ public class NioUdtProviderTest extends AbstractUdtTest { assertNotNull(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel()); assertNotNull(NioUdtProvider.MESSAGE_CONNECTOR.newChannel()); assertNotNull(NioUdtProvider.MESSAGE_RENDEZVOUS.newChannel()); + + // acceptor types + assertTrue(NioUdtProvider.BYTE_ACCEPTOR.newChannel() instanceof UdtServerChannel); + assertTrue(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel() instanceof UdtServerChannel); }