[#999] Refactor UDT transport to make use of the ServerChannel interface

This commit is contained in:
Andrei Pozolotin 2013-01-30 13:24:21 -06:00 committed by Norman Maurer
parent 97ea338bce
commit 82f876f7db
11 changed files with 281 additions and 55 deletions

View File

@ -25,12 +25,13 @@ import io.netty.example.udt.util.UtilConsoleReporter;
import io.netty.example.udt.util.UtilThreadFactory;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* UDT Byte Stream Client
* <p>
@ -62,7 +63,7 @@ public class ByteEchoClient {
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
connectFactory, NioUdtProvider.BYTE_PROVIDER);
try {
boot.group(connectGroup)
boot.group(connectGroup).localAddress(0)
.channelFactory(NioUdtProvider.BYTE_CONNECTOR)
.handler(new ChannelInitializer<UdtChannel>() {
@Override

View File

@ -25,12 +25,13 @@ import io.netty.example.udt.util.UtilConsoleReporter;
import io.netty.example.udt.util.UtilThreadFactory;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* UDT Message Flow client
* <p>
@ -62,7 +63,7 @@ public class MsgEchoClient {
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
connectFactory, NioUdtProvider.MESSAGE_PROVIDER);
try {
boot.group(connectGroup)
boot.group(connectGroup).localAddress(0)
.channelFactory(NioUdtProvider.MESSAGE_CONNECTOR)
.handler(new ChannelInitializer<UdtChannel>() {
@Override

View File

@ -44,7 +44,6 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements
private volatile int allocatorReceiveBufferSize = 128 * K;
private volatile int allocatorSendBufferSize = 128 * K;
private volatile int backlog = 64;
private volatile int soLinger;
private volatile boolean reuseAddress = true;
@ -82,11 +81,6 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements
return protocolReceiveBuferSize;
}
@Override
public int getBacklog() {
return backlog;
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(final ChannelOption<T> option) {
@ -114,9 +108,6 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements
if (option == SO_LINGER) {
return (T) Integer.valueOf(getSoLinger());
}
if (option == SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog());
}
return super.getOption(option);
}
@ -125,7 +116,7 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements
return getOptions(super.getOptions(), PROTOCOL_RECEIVE_BUFFER_SIZE,
PROTOCOL_SEND_BUFFER_SIZE, SYSTEM_RECEIVE_BUFFER_SIZE,
SYSTEM_SEND_BUFFER_SIZE, SO_RCVBUF, SO_SNDBUF, SO_REUSEADDR,
SO_LINGER, SO_BACKLOG);
SO_LINGER);
}
@Override
@ -154,12 +145,6 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements
return this;
}
@Override
public UdtChannelConfig setBacklog(final int backlog) {
this.backlog = backlog;
return this;
}
@Override
public <T> boolean setOption(final ChannelOption<T> option, final T value) {
validate(option, value);
@ -179,8 +164,6 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements
setReuseAddress((Boolean) value);
} else if (option == SO_LINGER) {
setSoLinger((Integer) value);
} else if (option == SO_BACKLOG) {
setBacklog((Integer) value);
} else {
return super.setOption(option, value);
}

View File

@ -0,0 +1,139 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.udt;
import static io.netty.channel.ChannelOption.*;
import io.netty.channel.ChannelOption;
import java.io.IOException;
import java.util.Map;
import com.barchart.udt.SocketUDT;
import com.barchart.udt.nio.ChannelUDT;
/**
* The default {@link UdtServerChannelConfig} implementation.
*/
public class DefaultUdtServerChannelConfig extends DefaultUdtChannelConfig
implements UdtServerChannelConfig {
private volatile int backlog = 64;
public DefaultUdtServerChannelConfig(final UdtChannel channel,
final ChannelUDT channelUDT, final boolean apply)
throws IOException {
super(channel, channelUDT, apply);
if (apply) {
apply(channelUDT);
}
}
@Override
protected void apply(final ChannelUDT channelUDT) throws IOException {
final SocketUDT socketUDT = channelUDT.socketUDT();
// nothing to apply for now.
}
@Override
public int getBacklog() {
return backlog;
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(final ChannelOption<T> option) {
if (option == SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog());
}
return super.getOption(option);
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_BACKLOG);
}
@Override
public UdtServerChannelConfig setBacklog(final int backlog) {
this.backlog = backlog;
return this;
}
@Override
public <T> boolean setOption(final ChannelOption<T> option, final T value) {
validate(option, value);
if (option == SO_BACKLOG) {
setBacklog((Integer) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public UdtServerChannelConfig setProtocolReceiveBufferSize(
final int protocolReceiveBuferSize) {
super.setProtocolReceiveBufferSize(protocolReceiveBuferSize);
return this;
}
@Override
public UdtServerChannelConfig setProtocolSendBufferSize(
final int protocolSendBuferSize) {
super.setProtocolSendBufferSize(protocolSendBuferSize);
return this;
}
@Override
public UdtServerChannelConfig setReceiveBufferSize(
final int receiveBufferSize) {
super.setReceiveBufferSize(receiveBufferSize);
return this;
}
@Override
public UdtServerChannelConfig setReuseAddress(final boolean reuseAddress) {
super.setReuseAddress(reuseAddress);
return this;
}
@Override
public UdtServerChannelConfig setSendBufferSize(final int sendBufferSize) {
super.setSendBufferSize(sendBufferSize);
return this;
}
@Override
public UdtServerChannelConfig setSoLinger(final int soLinger) {
super.setSoLinger(soLinger);
return this;
}
@Override
public UdtServerChannelConfig setSystemReceiveBufferSize(
final int systemSendBuferSize) {
super.setSystemReceiveBufferSize(systemSendBuferSize);
return this;
}
@Override
public UdtServerChannelConfig setSystemSendBufferSize(
final int systemReceiveBufferSize) {
super.setSystemSendBufferSize(systemReceiveBufferSize);
return this;
}
}

View File

@ -71,11 +71,6 @@ import com.barchart.udt.nio.KindUDT;
*/
public interface UdtChannelConfig extends ChannelConfig {
/**
* Gets {@link KindUDT#ACCEPTOR} channel backlog.
*/
int getBacklog();
/**
* Gets {@link OptionUDT#Protocol_Receive_Buffer_Size}
*/
@ -116,11 +111,6 @@ public interface UdtChannelConfig extends ChannelConfig {
*/
boolean isReuseAddress();
/**
* Sets {@link KindUDT#ACCEPTOR} channel backlog.
*/
UdtChannelConfig setBacklog(int backlog);
/**
* Sets {@link OptionUDT#Protocol_Receive_Buffer_Size}
*/

View File

@ -0,0 +1,30 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.udt;
import io.netty.channel.ServerChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import java.net.InetSocketAddress;
/**
* UDT {@link ServerChannel}.
* <p>
* Supported UDT {@link UdtServerChannel} are available via {@link NioUdtProvider}.
*/
public interface UdtServerChannel extends ServerChannel, UdtChannel {
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.udt;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import com.barchart.udt.TypeUDT;
import com.barchart.udt.nio.KindUDT;
/**
* A {@link ChannelConfig} for a {@link UdtServerChannel}.
* <p>
* Note that {@link TypeUDT#DATAGRAM} message oriented channels treat
* {@code "receiveBufferSize"} and {@code "sendBufferSize"} as maximum message
* size. If received or sent message does not fit specified sizes,
* {@link ChannelException} will be thrown.
*/
public interface UdtServerChannelConfig extends UdtChannelConfig {
/**
* Gets {@link KindUDT#ACCEPTOR} channel backlog via
* {@link ChannelOption#SO_BACKLOG}.
*/
int getBacklog();
/**
* Sets {@link KindUDT#ACCEPTOR} channel backlog via
* {@link ChannelOption#SO_BACKLOG}.
*/
UdtServerChannelConfig setBacklog(int backlog);
@Override
UdtServerChannelConfig setProtocolReceiveBufferSize(int size);
@Override
UdtServerChannelConfig setProtocolSendBufferSize(int size);
@Override
UdtServerChannelConfig setReceiveBufferSize(int receiveBufferSize);
@Override
UdtServerChannelConfig setReuseAddress(boolean reuseAddress);
@Override
UdtServerChannelConfig setSendBufferSize(int sendBufferSize);
@Override
UdtServerChannelConfig setSoLinger(int soLinger);
@Override
UdtServerChannelConfig setSystemReceiveBufferSize(int size);
@Override
UdtServerChannelConfig setSystemSendBufferSize(int size);
}

View File

@ -21,9 +21,10 @@ import io.netty.channel.ChannelException;
import io.netty.channel.socket.nio.AbstractNioMessageChannel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.channel.udt.DefaultUdtChannelConfig;
import io.netty.channel.udt.UdtChannel;
import io.netty.channel.udt.DefaultUdtServerChannelConfig;
import io.netty.channel.udt.UdtServerChannel;
import io.netty.channel.udt.UdtChannelConfig;
import io.netty.channel.udt.UdtServerChannelConfig;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -35,18 +36,18 @@ import com.barchart.udt.nio.ServerSocketChannelUDT;
* Common base for Netty Byte/Message UDT Stream/Datagram acceptors.
*/
public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel
implements UdtChannel {
implements UdtServerChannel {
protected static final InternalLogger logger = InternalLoggerFactory
.getInstance(NioUdtAcceptorChannel.class);
private final UdtChannelConfig config;
private final UdtServerChannelConfig config;
protected NioUdtAcceptorChannel(final ServerSocketChannelUDT channelUDT) {
super(null, channelUDT.socketUDT().id(), channelUDT, OP_ACCEPT);
try {
channelUDT.configureBlocking(false);
config = new DefaultUdtChannelConfig(this, channelUDT, true);
config = new DefaultUdtServerChannelConfig(this, channelUDT, true);
} catch (final Exception e) {
try {
channelUDT.close();
@ -55,7 +56,7 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel
logger.warn("Failed to close channel.", e2);
}
}
throw new ChannelException("Failed configure channel.", e);
throw new ChannelException("Failed to configure channel.", e);
}
}
@ -64,7 +65,7 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel
}
@Override
public UdtChannelConfig config() {
public UdtServerChannelConfig config() {
return config;
}

View File

@ -21,7 +21,9 @@ import com.barchart.udt.TypeUDT;
* Byte Channel Rendezvous for UDT Streams.
*/
public class NioUdtByteRendezvousChannel extends NioUdtByteConnectorChannel {
public NioUdtByteRendezvousChannel() {
super(NioUdtProvider.newRendezvousChannelUDT(TypeUDT.STREAM));
}
}

View File

@ -26,6 +26,7 @@ import com.barchart.udt.nio.SocketChannelUDT;
import io.netty.bootstrap.ChannelFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.udt.UdtServerChannel;
import io.netty.channel.udt.UdtChannel;
import java.io.IOException;
@ -38,20 +39,20 @@ import java.nio.channels.spi.SelectorProvider;
* <p>
* Provides {@link SelectorProvider} for UDT channels.
*/
public final class NioUdtProvider implements ChannelFactory<UdtChannel> {
public final class NioUdtProvider<T extends UdtChannel> implements ChannelFactory<T> {
/**
* {@link ChannelFactory} for UDT Byte Acceptor. See {@link TypeUDT#STREAM}
* and {@link KindUDT#ACCEPTOR}.
*/
public static final ChannelFactory<UdtChannel> BYTE_ACCEPTOR = new NioUdtProvider(
public static final ChannelFactory<UdtServerChannel> BYTE_ACCEPTOR = new NioUdtProvider<UdtServerChannel>(
TypeUDT.STREAM, KindUDT.ACCEPTOR);
/**
* {@link ChannelFactory} for UDT Byte Connector. See {@link TypeUDT#STREAM}
* and {@link KindUDT#CONNECTOR}.
*/
public static final ChannelFactory<UdtChannel> BYTE_CONNECTOR = new NioUdtProvider(
public static final ChannelFactory<UdtChannel> BYTE_CONNECTOR = new NioUdtProvider<UdtChannel>(
TypeUDT.STREAM, KindUDT.CONNECTOR);
/**
@ -64,21 +65,21 @@ public final class NioUdtProvider implements ChannelFactory<UdtChannel> {
* {@link ChannelFactory} for UDT Byte Rendezvous. See
* {@link TypeUDT#STREAM} and {@link KindUDT#RENDEZVOUS}.
*/
public static final ChannelFactory<UdtChannel> BYTE_RENDEZVOUS = new NioUdtProvider(
public static final ChannelFactory<UdtChannel> BYTE_RENDEZVOUS = new NioUdtProvider<UdtChannel>(
TypeUDT.STREAM, KindUDT.RENDEZVOUS);
/**
* {@link ChannelFactory} for UDT Message Acceptor. See
* {@link TypeUDT#DATAGRAM} and {@link KindUDT#ACCEPTOR}.
*/
public static final ChannelFactory<UdtChannel> MESSAGE_ACCEPTOR = new NioUdtProvider(
public static final ChannelFactory<UdtServerChannel> MESSAGE_ACCEPTOR = new NioUdtProvider<UdtServerChannel>(
TypeUDT.DATAGRAM, KindUDT.ACCEPTOR);
/**
* {@link ChannelFactory} for UDT Message Connector. See
* {@link TypeUDT#DATAGRAM} and {@link KindUDT#CONNECTOR}.
*/
public static final ChannelFactory<UdtChannel> MESSAGE_CONNECTOR = new NioUdtProvider(
public static final ChannelFactory<UdtChannel> MESSAGE_CONNECTOR = new NioUdtProvider<UdtChannel>(
TypeUDT.DATAGRAM, KindUDT.CONNECTOR);
/**
@ -91,7 +92,7 @@ public final class NioUdtProvider implements ChannelFactory<UdtChannel> {
* {@link ChannelFactory} for UDT Message Rendezvous. See
* {@link TypeUDT#DATAGRAM} and {@link KindUDT#RENDEZVOUS}.
*/
public static final ChannelFactory<UdtChannel> MESSAGE_RENDEZVOUS = new NioUdtProvider(
public static final ChannelFactory<UdtChannel> MESSAGE_RENDEZVOUS = new NioUdtProvider<UdtChannel>(
TypeUDT.DATAGRAM, KindUDT.RENDEZVOUS);
/**
@ -196,33 +197,34 @@ public final class NioUdtProvider implements ChannelFactory<UdtChannel> {
* Produce new {@link UdtChannel} based on factory {@link #kind()} and
* {@link #type()}
*/
@SuppressWarnings("unchecked")
@Override
public UdtChannel newChannel() {
public T newChannel() {
switch (kind) {
case ACCEPTOR:
switch (type) {
case DATAGRAM:
return new NioUdtMessageAcceptorChannel();
return (T) new NioUdtMessageAcceptorChannel();
case STREAM:
return new NioUdtByteAcceptorChannel();
return (T) new NioUdtByteAcceptorChannel();
default:
throw new IllegalStateException("wrong type=" + type);
}
case CONNECTOR:
switch (type) {
case DATAGRAM:
return new NioUdtMessageConnectorChannel();
return (T) new NioUdtMessageConnectorChannel();
case STREAM:
return new NioUdtByteConnectorChannel();
return (T) new NioUdtByteConnectorChannel();
default:
throw new IllegalStateException("wrong type=" + type);
}
case RENDEZVOUS:
switch (type) {
case DATAGRAM:
return new NioUdtMessageRendezvousChannel();
return (T) new NioUdtMessageRendezvousChannel();
case STREAM:
return new NioUdtByteRendezvousChannel();
return (T) new NioUdtByteRendezvousChannel();
default:
throw new IllegalStateException("wrong type=" + type);
}

View File

@ -16,6 +16,8 @@
package io.netty.channel.udt.nio;
import io.netty.channel.udt.UdtServerChannel;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,6 +43,10 @@ public class NioUdtProviderTest extends AbstractUdtTest {
assertNotNull(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel());
assertNotNull(NioUdtProvider.MESSAGE_CONNECTOR.newChannel());
assertNotNull(NioUdtProvider.MESSAGE_RENDEZVOUS.newChannel());
// acceptor types
assertTrue(NioUdtProvider.BYTE_ACCEPTOR.newChannel() instanceof UdtServerChannel);
assertTrue(NioUdtProvider.MESSAGE_ACCEPTOR.newChannel() instanceof UdtServerChannel);
}