Remove OIO transport (and transports that depend on it). (#8580)

Motivation:

This transport is unique because it uses Java's blocking IO (java.io / java.net) under the hood. However it is not clear if this transport is actually useful so it should be removed.

Modifications:

- Remove OIO transport and RXTX transport which depend on it.
- Remove Oio*Sctp* implementations
- Remove PerThreadEventLoop* which was only used by OIO transport.

Result:

Fixes https://github.com/netty/netty/issues/8510.
This commit is contained in:
Norman Maurer 2018-11-21 15:23:18 +01:00 committed by GitHub
parent 6650b325ad
commit e114d6be46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 9 additions and 5462 deletions

View File

@ -441,13 +441,6 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport-rxtx</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport-sctp</artifactId>

View File

@ -166,11 +166,6 @@
<artifactId>netty-transport</artifactId>
<version>5.0.0.Final-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-rxtx</artifactId>
<version>5.0.0.Final-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-sctp</artifactId>

View File

@ -109,16 +109,6 @@
<scope>runtime</scope>
</dependency>
<!-- see https://github.com/netty/netty/issues/874 -->
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport-rxtx</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>

View File

@ -1,61 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.rxtx;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.rxtx.RxtxChannel;
import io.netty.channel.rxtx.RxtxDeviceAddress;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* Sends one message to a serial device
*/
public final class RxtxClient {
static final String PORT = System.getProperty("port", "/dev/ttyUSB0");
public static void main(String[] args) throws Exception {
EventLoopGroup group = new OioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(RxtxChannel.class)
.handler(new ChannelInitializer<RxtxChannel>() {
@Override
public void initChannel(RxtxChannel ch) throws Exception {
ch.pipeline().addLast(
new LineBasedFrameDecoder(32768),
new StringEncoder(),
new StringDecoder(),
new RxtxClientHandler()
);
}
});
ChannelFuture f = b.connect(new RxtxDeviceAddress(PORT)).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.rxtx;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class RxtxClientHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush("AT\n");
}
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
if ("OK".equals(msg)) {
System.out.println("Serial port responded to AT");
} else {
System.out.println("Serial port responded with not-OK: " + msg);
}
ctx.close();
}
}

View File

@ -290,7 +290,6 @@
<module>transport-native-unix-common</module>
<module>transport-native-epoll</module>
<module>transport-native-kqueue</module>
<module>transport-rxtx</module>
<module>transport-sctp</module>
<module>handler</module>
<module>handler-proxy</module>
@ -442,12 +441,6 @@
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.rxtx</groupId>
<artifactId>rxtx</artifactId>
<version>2.1.7</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>

View File

@ -145,12 +145,6 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport-rxtx</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport-sctp</artifactId>

View File

@ -21,7 +21,6 @@ import static org.ops4j.pax.exam.CoreOptions.frameworkProperty;
import static org.ops4j.pax.exam.CoreOptions.junitBundles;
import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
import static org.ops4j.pax.exam.CoreOptions.systemProperty;
import static org.ops4j.pax.exam.CoreOptions.wrappedBundle;
import static org.osgi.framework.Constants.FRAMEWORK_BOOTDELEGATION;
import java.io.BufferedReader;
@ -94,8 +93,6 @@ public class OsgiBundleTest {
options.add(systemProperty("pax.exam.osgi.unresolved.fail").value("true"));
options.addAll(Arrays.asList(junitBundles()));
options.add(wrappedBundle(mavenBundle("org.rxtx", "rxtx").versionAsInProject()));
for (String name : BUNDLES) {
options.add(mavenBundle(GROUP, name).versionAsInProject());
}

View File

@ -19,11 +19,8 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.sctp.nio.NioSctpChannel;
import io.netty.channel.sctp.nio.NioSctpServerChannel;
import io.netty.channel.sctp.oio.OioSctpChannel;
import io.netty.channel.sctp.oio.OioSctpServerChannel;
import io.netty.testsuite.util.TestUtils;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
@ -41,19 +38,14 @@ public final class SctpTestPermutation {
new NioEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-sctp-nio-boss", true));
private static final EventLoopGroup nioWorkerGroup =
new NioEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-sctp-nio-worker", true));
private static final EventLoopGroup oioBossGroup =
new OioEventLoopGroup(Integer.MAX_VALUE, new DefaultThreadFactory("testsuite-sctp-oio-boss", true));
private static final EventLoopGroup oioWorkerGroup =
new OioEventLoopGroup(Integer.MAX_VALUE, new DefaultThreadFactory("testsuite-sctp-oio-worker", true));
static List<BootstrapFactory<ServerBootstrap>> sctpServerChannel() {
if (!TestUtils.isSctpSupported()) {
return Collections.emptyList();
}
List<BootstrapFactory<ServerBootstrap>> list = new ArrayList<BootstrapFactory<ServerBootstrap>>();
// Make the list of ServerBootstrap factories.
list.add(new BootstrapFactory<ServerBootstrap>() {
return Collections.<BootstrapFactory<ServerBootstrap>>singletonList(new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
@ -61,16 +53,6 @@ public final class SctpTestPermutation {
channel(NioSctpServerChannel.class);
}
});
list.add(new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
group(oioBossGroup, oioWorkerGroup).
channel(OioSctpServerChannel.class);
}
});
return list;
}
static List<BootstrapFactory<Bootstrap>> sctpClientChannel() {
@ -78,20 +60,12 @@ public final class SctpTestPermutation {
return Collections.emptyList();
}
List<BootstrapFactory<Bootstrap>> list = new ArrayList<BootstrapFactory<Bootstrap>>();
list.add(new BootstrapFactory<Bootstrap>() {
return Collections.<BootstrapFactory<Bootstrap>>singletonList(new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channel(NioSctpChannel.class);
}
});
list.add(new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(oioWorkerGroup).channel(OioSctpChannel.class);
}
});
return list;
}
static List<BootstrapComboFactory<ServerBootstrap, Bootstrap>> sctpChannel() {

View File

@ -21,7 +21,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.oio.OioDatagramChannel;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;
@ -59,9 +58,7 @@ public class DatagramConnectNotExistsTest extends AbstractClientSocketTest {
Assert.assertTrue(datagramChannel.isActive());
datagramChannel.writeAndFlush(
Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII)).syncUninterruptibly();
if (!(datagramChannel instanceof OioDatagramChannel)) {
Assert.assertTrue(promise.syncUninterruptibly().getNow() instanceof PortUnreachableException);
}
Assert.assertTrue(promise.syncUninterruptibly().getNow() instanceof PortUnreachableException);
} finally {
future.channel().close();
}

View File

@ -23,7 +23,6 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.oio.OioDatagramChannel;
import io.netty.util.NetUtil;
import io.netty.util.internal.SocketUtils;
import org.junit.Test;
@ -63,13 +62,6 @@ public class DatagramMulticastTest extends AbstractDatagramTest {
InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
cb.localAddress(addr.getPort());
if (sc instanceof OioDatagramChannel) {
// skip the test for OIO, as it fails because of
// No route to host which makes no sense.
// Maybe a JDK bug ?
sc.close().awaitUninterruptibly();
return;
}
DatagramChannel cc = (DatagramChannel) cb.bind().sync().channel();
String group = "230.0.0.1";

View File

@ -25,7 +25,6 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import org.junit.Test;
import java.net.ServerSocket;
@ -43,7 +42,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest {
@ -145,7 +143,6 @@ public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest {
ss.bind(newSocketAddress());
cb.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(2, 4));
ch = (SocketChannel) cb.handler(h).connect(ss.getLocalSocketAddress()).sync().channel();
assumeFalse(ch instanceof OioSocketChannel);
assertTrue(ch.isActive());
assertFalse(ch.isOutputShutdown());

View File

@ -20,17 +20,12 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.oio.OioDatagramChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
import io.netty.util.concurrent.DefaultThreadFactory;
@ -39,7 +34,7 @@ import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class SocketTestPermutation {
@ -58,16 +53,10 @@ public class SocketTestPermutation {
protected static final int BOSSES = 2;
protected static final int WORKERS = 3;
protected static final int OIO_SO_TIMEOUT = 10; // Use short timeout for faster runs.
protected final EventLoopGroup nioBossGroup =
new NioEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-nio-boss", true));
protected final EventLoopGroup nioWorkerGroup =
new NioEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-nio-worker", true));
protected final EventLoopGroup oioBossGroup =
new OioEventLoopGroup(Integer.MAX_VALUE, new DefaultThreadFactory("testsuite-oio-boss", true));
protected final EventLoopGroup oioWorkerGroup =
new OioEventLoopGroup(Integer.MAX_VALUE, new DefaultThreadFactory("testsuite-oio-worker", true));
protected <A extends AbstractBootstrap<?, ?>, B extends AbstractBootstrap<?, ?>>
List<BootstrapComboFactory<A, B>> combo(List<BootstrapFactory<A>> sbfs, List<BootstrapFactory<B>> cbfs) {
@ -104,17 +93,12 @@ public class SocketTestPermutation {
List<BootstrapFactory<Bootstrap>> cbfs = clientSocket();
// Populate the combinations
List<BootstrapComboFactory<ServerBootstrap, Bootstrap>> list = combo(sbfs, cbfs);
// Remove the OIO-OIO case which often leads to a dead lock by its nature.
list.remove(list.size() - 1);
return list;
return combo(sbfs, cbfs);
}
public List<BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
// Make the list of Bootstrap factories.
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
List<BootstrapFactory<Bootstrap>> bfs = Collections.<BootstrapFactory<Bootstrap>>singletonList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
@ -130,13 +114,6 @@ public class SocketTestPermutation {
}
});
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(oioWorkerGroup).channel(OioDatagramChannel.class)
.option(ChannelOption.SO_TIMEOUT, OIO_SO_TIMEOUT);
}
}
);
@ -145,57 +122,35 @@ public class SocketTestPermutation {
}
public List<BootstrapFactory<ServerBootstrap>> serverSocket() {
return Arrays.asList(
return Collections.<BootstrapFactory<ServerBootstrap>>singletonList(
new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().group(nioBossGroup, nioWorkerGroup)
.channel(NioServerSocketChannel.class);
}
},
new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().group(oioBossGroup, oioWorkerGroup)
.channel(OioServerSocketChannel.class)
.option(ChannelOption.SO_TIMEOUT, OIO_SO_TIMEOUT);
}
}
);
}
public List<BootstrapFactory<Bootstrap>> clientSocket() {
return Arrays.asList(
return Collections.<BootstrapFactory<Bootstrap>>singletonList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class);
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(oioWorkerGroup).channel(OioSocketChannel.class)
.option(ChannelOption.SO_TIMEOUT, OIO_SO_TIMEOUT);
}
}
);
}
public List<BootstrapFactory<Bootstrap>> datagramSocket() {
return Arrays.asList(
return Collections.<BootstrapFactory<Bootstrap>>singletonList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channel(NioDatagramChannel.class);
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(oioWorkerGroup).channel(OioDatagramChannel.class)
.option(ChannelOption.SO_TIMEOUT, OIO_SO_TIMEOUT);
}
}
);
}

View File

@ -1,54 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2013 The Netty Project
~
~ The Netty Project licenses this file to you under the Apache License,
~ version 2.0 (the "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at:
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>5.0.0.Final-SNAPSHOT</version>
</parent>
<artifactId>netty-transport-rxtx</artifactId>
<packaging>jar</packaging>
<name>Netty/Transport/RXTX</name>
<properties>
<javaModuleName>io.netty.transport.rxtx</javaModuleName>
</properties>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-buffer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport</artifactId>
<version>${project.version}</version>
</dependency>
<!-- FIXME find/make osgi bundle -->
<dependency>
<groupId>org.rxtx</groupId>
<artifactId>rxtx</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,280 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.rxtx;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.PreferHeapByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import java.util.Map;
import static io.netty.channel.rxtx.RxtxChannelOption.BAUD_RATE;
import static io.netty.channel.rxtx.RxtxChannelOption.DATA_BITS;
import static io.netty.channel.rxtx.RxtxChannelOption.DTR;
import static io.netty.channel.rxtx.RxtxChannelOption.PARITY_BIT;
import static io.netty.channel.rxtx.RxtxChannelOption.READ_TIMEOUT;
import static io.netty.channel.rxtx.RxtxChannelOption.RTS;
import static io.netty.channel.rxtx.RxtxChannelOption.STOP_BITS;
import static io.netty.channel.rxtx.RxtxChannelOption.WAIT_TIME;
/**
* Default configuration class for RXTX device connections.
*
* @deprecated this transport will be removed in the next major version.
*/
@Deprecated
final class DefaultRxtxChannelConfig extends DefaultChannelConfig implements RxtxChannelConfig {
private volatile int baudrate = 115200;
private volatile boolean dtr;
private volatile boolean rts;
private volatile Stopbits stopbits = Stopbits.STOPBITS_1;
private volatile Databits databits = Databits.DATABITS_8;
private volatile Paritybit paritybit = Paritybit.NONE;
private volatile int waitTime;
private volatile int readTimeout = 1000;
DefaultRxtxChannelConfig(RxtxChannel channel) {
super(channel);
setAllocator(new PreferHeapByteBufAllocator(getAllocator()));
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), BAUD_RATE, DTR, RTS, STOP_BITS, DATA_BITS, PARITY_BIT, WAIT_TIME);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == BAUD_RATE) {
return (T) Integer.valueOf(getBaudrate());
}
if (option == DTR) {
return (T) Boolean.valueOf(isDtr());
}
if (option == RTS) {
return (T) Boolean.valueOf(isRts());
}
if (option == STOP_BITS) {
return (T) getStopbits();
}
if (option == DATA_BITS) {
return (T) getDatabits();
}
if (option == PARITY_BIT) {
return (T) getParitybit();
}
if (option == WAIT_TIME) {
return (T) Integer.valueOf(getWaitTimeMillis());
}
if (option == READ_TIMEOUT) {
return (T) Integer.valueOf(getReadTimeout());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == BAUD_RATE) {
setBaudrate((Integer) value);
} else if (option == DTR) {
setDtr((Boolean) value);
} else if (option == RTS) {
setRts((Boolean) value);
} else if (option == STOP_BITS) {
setStopbits((Stopbits) value);
} else if (option == DATA_BITS) {
setDatabits((Databits) value);
} else if (option == PARITY_BIT) {
setParitybit((Paritybit) value);
} else if (option == WAIT_TIME) {
setWaitTimeMillis((Integer) value);
} else if (option == READ_TIMEOUT) {
setReadTimeout((Integer) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public RxtxChannelConfig setBaudrate(final int baudrate) {
this.baudrate = baudrate;
return this;
}
@Override
public RxtxChannelConfig setStopbits(final Stopbits stopbits) {
this.stopbits = stopbits;
return this;
}
@Override
public RxtxChannelConfig setDatabits(final Databits databits) {
this.databits = databits;
return this;
}
@Override
public RxtxChannelConfig setParitybit(final Paritybit paritybit) {
this.paritybit = paritybit;
return this;
}
@Override
public int getBaudrate() {
return baudrate;
}
@Override
public Stopbits getStopbits() {
return stopbits;
}
@Override
public Databits getDatabits() {
return databits;
}
@Override
public Paritybit getParitybit() {
return paritybit;
}
@Override
public boolean isDtr() {
return dtr;
}
@Override
public RxtxChannelConfig setDtr(final boolean dtr) {
this.dtr = dtr;
return this;
}
@Override
public boolean isRts() {
return rts;
}
@Override
public RxtxChannelConfig setRts(final boolean rts) {
this.rts = rts;
return this;
}
@Override
public int getWaitTimeMillis() {
return waitTime;
}
@Override
public RxtxChannelConfig setWaitTimeMillis(final int waitTimeMillis) {
if (waitTimeMillis < 0) {
throw new IllegalArgumentException("Wait time must be >= 0");
}
waitTime = waitTimeMillis;
return this;
}
@Override
public RxtxChannelConfig setReadTimeout(int readTimeout) {
if (readTimeout < 0) {
throw new IllegalArgumentException("readTime must be >= 0");
}
this.readTimeout = readTimeout;
return this;
}
@Override
public int getReadTimeout() {
return readTimeout;
}
@Override
public RxtxChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
@Deprecated
public RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public RxtxChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public RxtxChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public RxtxChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public RxtxChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
public RxtxChannelConfig setAutoClose(boolean autoClose) {
super.setAutoClose(autoClose);
return this;
}
@Override
public RxtxChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public RxtxChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public RxtxChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
super.setWriteBufferWaterMark(writeBufferWaterMark);
return this;
}
@Override
public RxtxChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
}

View File

@ -1,195 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.rxtx;
import gnu.io.CommPort;
import gnu.io.CommPortIdentifier;
import gnu.io.SerialPort;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.oio.OioByteStreamChannel;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import static io.netty.channel.rxtx.RxtxChannelOption.BAUD_RATE;
import static io.netty.channel.rxtx.RxtxChannelOption.DATA_BITS;
import static io.netty.channel.rxtx.RxtxChannelOption.DTR;
import static io.netty.channel.rxtx.RxtxChannelOption.PARITY_BIT;
import static io.netty.channel.rxtx.RxtxChannelOption.READ_TIMEOUT;
import static io.netty.channel.rxtx.RxtxChannelOption.RTS;
import static io.netty.channel.rxtx.RxtxChannelOption.STOP_BITS;
import static io.netty.channel.rxtx.RxtxChannelOption.WAIT_TIME;
/**
* A channel to a serial device using the RXTX library.
*
* @deprecated this transport will be removed in the next major version.
*/
@Deprecated
public class RxtxChannel extends OioByteStreamChannel {
private static final RxtxDeviceAddress LOCAL_ADDRESS = new RxtxDeviceAddress("localhost");
private final RxtxChannelConfig config;
private boolean open = true;
private RxtxDeviceAddress deviceAddress;
private SerialPort serialPort;
public RxtxChannel() {
super(null);
config = new DefaultRxtxChannelConfig(this);
}
@Override
public RxtxChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return open;
}
@Override
protected AbstractUnsafe newUnsafe() {
return new RxtxUnsafe();
}
@Override
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
RxtxDeviceAddress remote = (RxtxDeviceAddress) remoteAddress;
final CommPortIdentifier cpi = CommPortIdentifier.getPortIdentifier(remote.value());
final CommPort commPort = cpi.open(getClass().getName(), 1000);
commPort.enableReceiveTimeout(config().getOption(READ_TIMEOUT));
deviceAddress = remote;
serialPort = (SerialPort) commPort;
}
protected void doInit() throws Exception {
serialPort.setSerialPortParams(
config().getOption(BAUD_RATE),
config().getOption(DATA_BITS).value(),
config().getOption(STOP_BITS).value(),
config().getOption(PARITY_BIT).value()
);
serialPort.setDTR(config().getOption(DTR));
serialPort.setRTS(config().getOption(RTS));
activate(serialPort.getInputStream(), serialPort.getOutputStream());
}
@Override
public RxtxDeviceAddress localAddress() {
return (RxtxDeviceAddress) super.localAddress();
}
@Override
public RxtxDeviceAddress remoteAddress() {
return (RxtxDeviceAddress) super.remoteAddress();
}
@Override
protected RxtxDeviceAddress localAddress0() {
return LOCAL_ADDRESS;
}
@Override
protected RxtxDeviceAddress remoteAddress0() {
return deviceAddress;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
open = false;
try {
super.doClose();
} finally {
if (serialPort != null) {
serialPort.removeEventListener();
serialPort.close();
serialPort = null;
}
}
}
@Override
protected boolean isInputShutdown() {
return !open;
}
@Override
protected ChannelFuture shutdownInput() {
return newFailedFuture(new UnsupportedOperationException("shutdownInput"));
}
private final class RxtxUnsafe extends AbstractUnsafe {
@Override
public void connect(
final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
final boolean wasActive = isActive();
doConnect(remoteAddress, localAddress);
int waitTime = config().getOption(WAIT_TIME);
if (waitTime > 0) {
eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
doInit();
safeSetSuccess(promise);
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
}
}
}, waitTime, TimeUnit.MILLISECONDS);
} else {
doInit();
safeSetSuccess(promise);
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
}
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
}
}
}
}

View File

@ -1,310 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.rxtx;
import gnu.io.SerialPort;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
/**
* A configuration class for RXTX device connections.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link ChannelConfig},
* {@link DefaultRxtxChannelConfig} allows the following options in the option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@link RxtxChannelOption#BAUD_RATE}</td><td>{@link #setBaudrate(int)}</td>
* </tr><tr>
* <td>{@link RxtxChannelOption#DTR}</td><td>{@link #setDtr(boolean)}</td>
* </tr><tr>
* <td>{@link RxtxChannelOption#RTS}</td><td>{@link #setRts(boolean)}</td>
* </tr><tr>
* <td>{@link RxtxChannelOption#STOP_BITS}</td><td>{@link #setStopbits(Stopbits)}</td>
* </tr><tr>
* <td>{@link RxtxChannelOption#DATA_BITS}</td><td>{@link #setDatabits(Databits)}</td>
* </tr><tr>
* <td>{@link RxtxChannelOption#PARITY_BIT}</td><td>{@link #setParitybit(Paritybit)}</td>
* </tr><tr>
* <td>{@link RxtxChannelOption#WAIT_TIME}</td><td>{@link #setWaitTimeMillis(int)}</td>
* </tr>
* </table>
*
* @deprecated this transport will be removed in the next major version.
*/
@Deprecated
public interface RxtxChannelConfig extends ChannelConfig {
enum Stopbits {
/**
* 1 stop bit will be sent at the end of every character
*/
STOPBITS_1(SerialPort.STOPBITS_1),
/**
* 2 stop bits will be sent at the end of every character
*/
STOPBITS_2(SerialPort.STOPBITS_2),
/**
* 1.5 stop bits will be sent at the end of every character
*/
STOPBITS_1_5(SerialPort.STOPBITS_1_5);
private final int value;
Stopbits(int value) {
this.value = value;
}
public int value() {
return value;
}
public static Stopbits valueOf(int value) {
for (Stopbits stopbit : Stopbits.values()) {
if (stopbit.value == value) {
return stopbit;
}
}
throw new IllegalArgumentException("unknown " + Stopbits.class.getSimpleName() + " value: " + value);
}
}
enum Databits {
/**
* 5 data bits will be used for each character (ie. Baudot code)
*/
DATABITS_5(SerialPort.DATABITS_5),
/**
* 6 data bits will be used for each character
*/
DATABITS_6(SerialPort.DATABITS_6),
/**
* 7 data bits will be used for each character (ie. ASCII)
*/
DATABITS_7(SerialPort.DATABITS_7),
/**
* 8 data bits will be used for each character (ie. binary data)
*/
DATABITS_8(SerialPort.DATABITS_8);
private final int value;
Databits(int value) {
this.value = value;
}
public int value() {
return value;
}
public static Databits valueOf(int value) {
for (Databits databit : Databits.values()) {
if (databit.value == value) {
return databit;
}
}
throw new IllegalArgumentException("unknown " + Databits.class.getSimpleName() + " value: " + value);
}
}
enum Paritybit {
/**
* No parity bit will be sent with each data character at all
*/
NONE(SerialPort.PARITY_NONE),
/**
* An odd parity bit will be sent with each data character, ie. will be set
* to 1 if the data character contains an even number of bits set to 1.
*/
ODD(SerialPort.PARITY_ODD),
/**
* An even parity bit will be sent with each data character, ie. will be set
* to 1 if the data character contains an odd number of bits set to 1.
*/
EVEN(SerialPort.PARITY_EVEN),
/**
* A mark parity bit (ie. always 1) will be sent with each data character
*/
MARK(SerialPort.PARITY_MARK),
/**
* A space parity bit (ie. always 0) will be sent with each data character
*/
SPACE(SerialPort.PARITY_SPACE);
private final int value;
Paritybit(int value) {
this.value = value;
}
public int value() {
return value;
}
public static Paritybit valueOf(int value) {
for (Paritybit paritybit : Paritybit.values()) {
if (paritybit.value == value) {
return paritybit;
}
}
throw new IllegalArgumentException("unknown " + Paritybit.class.getSimpleName() + " value: " + value);
}
}
/**
* Sets the baud rate (ie. bits per second) for communication with the serial device.
* The baud rate will include bits for framing (in the form of stop bits and parity),
* such that the effective data rate will be lower than this value.
*
* @param baudrate The baud rate (in bits per second)
*/
RxtxChannelConfig setBaudrate(int baudrate);
/**
* Sets the number of stop bits to include at the end of every character to aid the
* serial device in synchronising with the data.
*
* @param stopbits The number of stop bits to use
*/
RxtxChannelConfig setStopbits(Stopbits stopbits);
/**
* Sets the number of data bits to use to make up each character sent to the serial
* device.
*
* @param databits The number of data bits to use
*/
RxtxChannelConfig setDatabits(Databits databits);
/**
* Sets the type of parity bit to be used when communicating with the serial device.
*
* @param paritybit The type of parity bit to be used
*/
RxtxChannelConfig setParitybit(Paritybit paritybit);
/**
* @return The configured baud rate, defaulting to 115200 if unset
*/
int getBaudrate();
/**
* @return The configured stop bits, defaulting to {@link Stopbits#STOPBITS_1} if unset
*/
Stopbits getStopbits();
/**
* @return The configured data bits, defaulting to {@link Databits#DATABITS_8} if unset
*/
Databits getDatabits();
/**
* @return The configured parity bit, defaulting to {@link Paritybit#NONE} if unset
*/
Paritybit getParitybit();
/**
* @return true if the serial device should support the Data Terminal Ready signal
*/
boolean isDtr();
/**
* Sets whether the serial device supports the Data Terminal Ready signal, used for
* flow control
*
* @param dtr true if DTR is supported, false otherwise
*/
RxtxChannelConfig setDtr(boolean dtr);
/**
* @return true if the serial device should support the Ready to Send signal
*/
boolean isRts();
/**
* Sets whether the serial device supports the Request To Send signal, used for flow
* control
*
* @param rts true if RTS is supported, false otherwise
*/
RxtxChannelConfig setRts(boolean rts);
/**
* @return The number of milliseconds to wait between opening the serial port and
* initialising.
*/
int getWaitTimeMillis();
/**
* Sets the time to wait after opening the serial port and before sending it any
* configuration information or data. A value of 0 indicates that no waiting should
* occur.
*
* @param waitTimeMillis The number of milliseconds to wait, defaulting to 0 (no
* wait) if unset
* @throws IllegalArgumentException if the supplied value is &lt; 0
*/
RxtxChannelConfig setWaitTimeMillis(int waitTimeMillis);
/**
* Sets the maximal time (in ms) to block while try to read from the serial port. Default is 1000ms
*/
RxtxChannelConfig setReadTimeout(int readTimeout);
/**
* Return the maximal time (in ms) to block and wait for something to be ready to read.
*/
int getReadTimeout();
@Override
RxtxChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
@Deprecated
RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
RxtxChannelConfig setWriteSpinCount(int writeSpinCount);
@Override
RxtxChannelConfig setAllocator(ByteBufAllocator allocator);
@Override
RxtxChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);
@Override
RxtxChannelConfig setAutoRead(boolean autoRead);
@Override
RxtxChannelConfig setAutoClose(boolean autoClose);
@Override
RxtxChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override
RxtxChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
@Override
RxtxChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);
@Override
RxtxChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
}

View File

@ -1,44 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.rxtx;
import io.netty.channel.ChannelOption;
import io.netty.channel.rxtx.RxtxChannelConfig.Databits;
import io.netty.channel.rxtx.RxtxChannelConfig.Paritybit;
import io.netty.channel.rxtx.RxtxChannelConfig.Stopbits;
/**
* Option for configuring a serial port connection
*
* @deprecated this transport will be removed in the next major version.
*/
@Deprecated
public final class RxtxChannelOption<T> extends ChannelOption<T> {
public static final ChannelOption<Integer> BAUD_RATE = valueOf(RxtxChannelOption.class, "BAUD_RATE");
public static final ChannelOption<Boolean> DTR = valueOf(RxtxChannelOption.class, "DTR");
public static final ChannelOption<Boolean> RTS = valueOf(RxtxChannelOption.class, "RTS");
public static final ChannelOption<Stopbits> STOP_BITS = valueOf(RxtxChannelOption.class, "STOP_BITS");
public static final ChannelOption<Databits> DATA_BITS = valueOf(RxtxChannelOption.class, "DATA_BITS");
public static final ChannelOption<Paritybit> PARITY_BIT = valueOf(RxtxChannelOption.class, "PARITY_BIT");
public static final ChannelOption<Integer> WAIT_TIME = valueOf(RxtxChannelOption.class, "WAIT_TIME");
public static final ChannelOption<Integer> READ_TIMEOUT = valueOf(RxtxChannelOption.class, "READ_TIMEOUT");
@SuppressWarnings({ "unused", "deprecation" })
private RxtxChannelOption() {
super(null);
}
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.rxtx;
import java.net.SocketAddress;
/**
* A {@link SocketAddress} subclass to wrap the serial port address of a RXTX
* device (e.g. COM1, /dev/ttyUSB0).
*
* @deprecated this transport will be removed in the next major version.
*/
@Deprecated
public class RxtxDeviceAddress extends SocketAddress {
private static final long serialVersionUID = -2907820090993709523L;
private final String value;
/**
* Creates a RxtxDeviceAddress representing the address of the serial port.
*
* @param value the address of the device (e.g. COM1, /dev/ttyUSB0, ...)
*/
public RxtxDeviceAddress(String value) {
this.value = value;
}
/**
* @return The serial port address of the device (e.g. COM1, /dev/ttyUSB0, ...)
*/
public String value() {
return value;
}
}

View File

@ -1,22 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* A serial and parallel port communication transport based on <a href="http://rxtx.qbang.org/">RXTX</a>.
*
* @deprecated this transport will be removed in the next major version.
*/
package io.netty.channel.rxtx;

View File

@ -1,474 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.sctp.oio;
import com.sun.nio.sctp.Association;
import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.NotificationHandler;
import com.sun.nio.sctp.SctpChannel;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.oio.AbstractOioMessageChannel;
import io.netty.channel.sctp.DefaultSctpChannelConfig;
import io.netty.channel.sctp.SctpChannelConfig;
import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
/**
* {@link io.netty.channel.sctp.SctpChannel} implementation which use blocking mode and allows to read / write
* {@link SctpMessage}s to the underlying {@link SctpChannel}.
*
* Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
* to understand what you need to do to use it. Also this feature is only supported on Java 7+.
*
* @deprecated use {@link io.netty.channel.sctp.nio.NioSctpChannel}.
*/
@Deprecated
public class OioSctpChannel extends AbstractOioMessageChannel
implements io.netty.channel.sctp.SctpChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioSctpChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final String EXPECTED_TYPE = " (expected: " + StringUtil.simpleClassName(SctpMessage.class) + ')';
private final SctpChannel ch;
private final SctpChannelConfig config;
private final Selector readSelector;
private final Selector writeSelector;
private final Selector connectSelector;
private final NotificationHandler<?> notificationHandler;
private static SctpChannel openChannel() {
try {
return SctpChannel.open();
} catch (IOException e) {
throw new ChannelException("Failed to open a sctp channel.", e);
}
}
/**
* Create a new instance with an new {@link SctpChannel}.
*/
public OioSctpChannel() {
this(openChannel());
}
/**
* Create a new instance from the given {@link SctpChannel}.
*
* @param ch the {@link SctpChannel} which is used by this instance
*/
public OioSctpChannel(SctpChannel ch) {
this(null, ch);
}
/**
* Create a new instance from the given {@link SctpChannel}.
*
* @param parent the parent {@link Channel} which was used to create this instance. This can be null if the
* {@link} has no parent as it was created by your self.
* @param ch the {@link SctpChannel} which is used by this instance
*/
public OioSctpChannel(Channel parent, SctpChannel ch) {
super(parent);
this.ch = ch;
boolean success = false;
try {
ch.configureBlocking(false);
readSelector = Selector.open();
writeSelector = Selector.open();
connectSelector = Selector.open();
ch.register(readSelector, SelectionKey.OP_READ);
ch.register(writeSelector, SelectionKey.OP_WRITE);
ch.register(connectSelector, SelectionKey.OP_CONNECT);
config = new OioSctpChannelConfig(this, ch);
notificationHandler = new SctpNotificationHandler(this);
success = true;
} catch (Exception e) {
throw new ChannelException("failed to initialize a sctp channel", e);
} finally {
if (!success) {
try {
ch.close();
} catch (IOException e) {
logger.warn("Failed to close a sctp channel.", e);
}
}
}
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public SctpServerChannel parent() {
return (SctpServerChannel) super.parent();
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public SctpChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return ch.isOpen();
}
@Override
protected int doReadMessages(List<Object> msgs) throws Exception {
if (!readSelector.isOpen()) {
return 0;
}
int readMessages = 0;
final int selectedKeys = readSelector.select(SO_TIMEOUT);
final boolean keysSelected = selectedKeys > 0;
if (!keysSelected) {
return readMessages;
}
// We must clear the selectedKeys because the Selector will never do it. If we do not clear it, the selectionKey
// will always be returned even if there is no data can be read which causes performance issue. And in some
// implementation of Selector, the select method may return 0 if the selectionKey which is ready for process has
// already been in the selectedKeys and cause the keysSelected above to be false even if we actually have
// something to read.
readSelector.selectedKeys().clear();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
ByteBuf buffer = allocHandle.allocate(config().getAllocator());
boolean free = true;
try {
ByteBuffer data = buffer.nioBuffer(buffer.writerIndex(), buffer.writableBytes());
MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
if (messageInfo == null) {
return readMessages;
}
data.flip();
allocHandle.lastBytesRead(data.remaining());
msgs.add(new SctpMessage(messageInfo,
buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
free = false;
++readMessages;
} catch (Throwable cause) {
PlatformDependent.throwException(cause);
} finally {
if (free) {
buffer.release();
}
}
return readMessages;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
if (!writeSelector.isOpen()) {
return;
}
final int size = in.size();
final int selectedKeys = writeSelector.select(SO_TIMEOUT);
if (selectedKeys > 0) {
final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
if (writableKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
int written = 0;
for (;;) {
if (written == size) {
// all written
return;
}
writableKeysIt.next();
writableKeysIt.remove();
SctpMessage packet = (SctpMessage) in.current();
if (packet == null) {
return;
}
ByteBuf data = packet.content();
int dataLen = data.readableBytes();
ByteBuffer nioData;
if (data.nioBufferCount() != -1) {
nioData = data.nioBuffer();
} else {
nioData = ByteBuffer.allocate(dataLen);
data.getBytes(data.readerIndex(), nioData);
nioData.flip();
}
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
mi.payloadProtocolID(packet.protocolIdentifier());
mi.streamNumber(packet.streamIdentifier());
mi.unordered(packet.isUnordered());
ch.send(nioData, mi);
written ++;
in.remove();
if (!writableKeysIt.hasNext()) {
return;
}
}
}
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof SctpMessage) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPE);
}
@Override
public Association association() {
try {
return ch.association();
} catch (IOException ignored) {
return null;
}
}
@Override
public boolean isActive() {
return isOpen() && association() != null;
}
@Override
protected SocketAddress localAddress0() {
try {
Iterator<SocketAddress> i = ch.getAllLocalAddresses().iterator();
if (i.hasNext()) {
return i.next();
}
} catch (IOException e) {
// ignore
}
return null;
}
@Override
public Set<InetSocketAddress> allLocalAddresses() {
try {
final Set<SocketAddress> allLocalAddresses = ch.getAllLocalAddresses();
final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
for (SocketAddress socketAddress : allLocalAddresses) {
addresses.add((InetSocketAddress) socketAddress);
}
return addresses;
} catch (Throwable ignored) {
return Collections.emptySet();
}
}
@Override
protected SocketAddress remoteAddress0() {
try {
Iterator<SocketAddress> i = ch.getRemoteAddresses().iterator();
if (i.hasNext()) {
return i.next();
}
} catch (IOException e) {
// ignore
}
return null;
}
@Override
public Set<InetSocketAddress> allRemoteAddresses() {
try {
final Set<SocketAddress> allLocalAddresses = ch.getRemoteAddresses();
final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
for (SocketAddress socketAddress : allLocalAddresses) {
addresses.add((InetSocketAddress) socketAddress);
}
return addresses;
} catch (Throwable ignored) {
return Collections.emptySet();
}
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
ch.bind(localAddress);
}
@Override
protected void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) throws Exception {
if (localAddress != null) {
ch.bind(localAddress);
}
boolean success = false;
try {
ch.connect(remoteAddress);
boolean finishConnect = false;
while (!finishConnect) {
if (connectSelector.select(SO_TIMEOUT) >= 0) {
final Set<SelectionKey> selectionKeys = connectSelector.selectedKeys();
for (SelectionKey key : selectionKeys) {
if (key.isConnectable()) {
selectionKeys.clear();
finishConnect = true;
break;
}
}
selectionKeys.clear();
}
}
success = ch.finishConnect();
} finally {
if (!success) {
doClose();
}
}
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
closeSelector("read", readSelector);
closeSelector("write", writeSelector);
closeSelector("connect", connectSelector);
ch.close();
}
private static void closeSelector(String selectorName, Selector selector) {
try {
selector.close();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a " + selectorName + " selector.", e);
}
}
}
@Override
public ChannelFuture bindAddress(InetAddress localAddress) {
return bindAddress(localAddress, newPromise());
}
@Override
public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
try {
ch.bindAddress(localAddress);
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
bindAddress(localAddress, promise);
}
});
}
return promise;
}
@Override
public ChannelFuture unbindAddress(InetAddress localAddress) {
return unbindAddress(localAddress, newPromise());
}
@Override
public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
try {
ch.unbindAddress(localAddress);
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
unbindAddress(localAddress, promise);
}
});
}
return promise;
}
private final class OioSctpChannelConfig extends DefaultSctpChannelConfig {
private OioSctpChannelConfig(OioSctpChannel channel, SctpChannel javaChannel) {
super(channel, javaChannel);
}
@Override
protected void autoReadCleared() {
clearReadPending();
}
}
}

View File

@ -1,311 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.sctp.oio;
import com.sun.nio.sctp.SctpChannel;
import com.sun.nio.sctp.SctpServerChannel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.oio.AbstractOioMessageChannel;
import io.netty.channel.sctp.DefaultSctpServerChannelConfig;
import io.netty.channel.sctp.SctpServerChannelConfig;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
/**
* {@link io.netty.channel.sctp.SctpServerChannel} implementation which use blocking mode to accept new
* connections and create the {@link OioSctpChannel} for them.
*
* Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
* to understand what you need to do to use it. Also this feature is only supported on Java 7+.
*
* @deprecated use {@link io.netty.channel.sctp.nio.NioSctpServerChannel}.
*/
@Deprecated
public class OioSctpServerChannel extends AbstractOioMessageChannel
implements io.netty.channel.sctp.SctpServerChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioSctpServerChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 1);
private static SctpServerChannel newServerSocket() {
try {
return SctpServerChannel.open();
} catch (IOException e) {
throw new ChannelException("failed to create a sctp server channel", e);
}
}
private final SctpServerChannel sch;
private final SctpServerChannelConfig config;
private final Selector selector;
/**
* Create a new instance with an new {@link SctpServerChannel}
*/
public OioSctpServerChannel() {
this(newServerSocket());
}
/**
* Create a new instance from the given {@link SctpServerChannel}
*
* @param sch the {@link SctpServerChannel} which is used by this instance
*/
public OioSctpServerChannel(SctpServerChannel sch) {
super(null);
if (sch == null) {
throw new NullPointerException("sctp server channel");
}
this.sch = sch;
boolean success = false;
try {
sch.configureBlocking(false);
selector = Selector.open();
sch.register(selector, SelectionKey.OP_ACCEPT);
config = new OioSctpServerChannelConfig(this, sch);
success = true;
} catch (Exception e) {
throw new ChannelException("failed to initialize a sctp server channel", e);
} finally {
if (!success) {
try {
sch.close();
} catch (IOException e) {
logger.warn("Failed to close a sctp server channel.", e);
}
}
}
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public SctpServerChannelConfig config() {
return config;
}
@Override
public InetSocketAddress remoteAddress() {
return null;
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public boolean isOpen() {
return sch.isOpen();
}
@Override
protected SocketAddress localAddress0() {
try {
Iterator<SocketAddress> i = sch.getAllLocalAddresses().iterator();
if (i.hasNext()) {
return i.next();
}
} catch (IOException e) {
// ignore
}
return null;
}
@Override
public Set<InetSocketAddress> allLocalAddresses() {
try {
final Set<SocketAddress> allLocalAddresses = sch.getAllLocalAddresses();
final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
for (SocketAddress socketAddress : allLocalAddresses) {
addresses.add((InetSocketAddress) socketAddress);
}
return addresses;
} catch (Throwable ignored) {
return Collections.emptySet();
}
}
@Override
public boolean isActive() {
return isOpen() && localAddress0() != null;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
sch.bind(localAddress, config.getBacklog());
}
@Override
protected void doClose() throws Exception {
try {
selector.close();
} catch (IOException e) {
logger.warn("Failed to close a selector.", e);
}
sch.close();
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
if (!isActive()) {
return -1;
}
SctpChannel s = null;
int acceptedChannels = 0;
try {
final int selectedKeys = selector.select(SO_TIMEOUT);
if (selectedKeys > 0) {
final Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
for (;;) {
SelectionKey key = selectionKeys.next();
selectionKeys.remove();
if (key.isAcceptable()) {
s = sch.accept();
if (s != null) {
buf.add(new OioSctpChannel(this, s));
acceptedChannels ++;
}
}
if (!selectionKeys.hasNext()) {
return acceptedChannels;
}
}
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted sctp channel.", t);
if (s != null) {
try {
s.close();
} catch (Throwable t2) {
logger.warn("Failed to close a sctp channel.", t2);
}
}
}
return acceptedChannels;
}
@Override
public ChannelFuture bindAddress(InetAddress localAddress) {
return bindAddress(localAddress, newPromise());
}
@Override
public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
try {
sch.bindAddress(localAddress);
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
bindAddress(localAddress, promise);
}
});
}
return promise;
}
@Override
public ChannelFuture unbindAddress(InetAddress localAddress) {
return unbindAddress(localAddress, newPromise());
}
@Override
public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
try {
sch.unbindAddress(localAddress);
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
unbindAddress(localAddress, promise);
}
});
}
return promise;
}
@Override
protected void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
private final class OioSctpServerChannelConfig extends DefaultSctpServerChannelConfig {
private OioSctpServerChannelConfig(OioSctpServerChannel channel, SctpServerChannel javaChannel) {
super(channel, javaChannel);
}
@Override
protected void autoReadCleared() {
clearReadPending();
}
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Old blocking I/O based SCTP channel API implementation - recommended for
* a small number of connections (&lt; 1000).
*
* @deprecated use NIO based SCTP implementation.
*/
@Deprecated
package io.netty.channel.sctp.oio;

View File

@ -1,98 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
/**
* {@link SingleThreadEventLoop} which is used to handle OIO {@link Channel}'s. So in general there will be
* one {@link ThreadPerChannelEventLoop} per {@link Channel}.
*
* @deprecated this will be remove in the next-major release.
*/
@Deprecated
public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
private final ThreadPerChannelEventLoopGroup parent;
private Channel ch;
public ThreadPerChannelEventLoop(ThreadPerChannelEventLoopGroup parent) {
super(parent, parent.executor, true);
this.parent = parent;
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return super.register(promise).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ch = future.channel();
} else {
deregister();
}
}
});
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return super.register(channel, promise).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ch = future.channel();
} else {
deregister();
}
}
});
}
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
Channel ch = this.ch;
if (isShuttingDown()) {
if (ch != null) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
if (confirmShutdown()) {
break;
}
} else {
if (ch != null) {
// Handle deregistration
if (!ch.isRegistered()) {
runAllTasks();
deregister();
}
}
}
}
}
protected void deregister() {
ch = null;
parent.activeChildren.remove(this);
parent.idleChildren.add(this);
}
}

View File

@ -1,328 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.AbstractEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ThreadPerTaskExecutor;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReadOnlyIterator;
import io.netty.util.internal.ThrowableUtil;
import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
*
* @deprecated this will be remove in the next-major release.
*/
@Deprecated
public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
private final Object[] childArgs;
private final int maxChannels;
final Executor executor;
final Set<EventLoop> activeChildren =
Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
private final ChannelException tooManyChannels;
private volatile boolean shuttingDown;
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// Inefficient, but works.
if (isTerminated()) {
terminationFuture.trySuccess(null);
}
}
};
/**
* Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
*/
protected ThreadPerChannelEventLoopGroup() {
this(0);
}
/**
* Create a new {@link ThreadPerChannelEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException}. on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
*/
protected ThreadPerChannelEventLoopGroup(int maxChannels) {
this(maxChannels, Executors.defaultThreadFactory());
}
/**
* Create a new {@link ThreadPerChannelEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
*/
protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
this(maxChannels, new ThreadPerTaskExecutor(threadFactory), args);
}
/**
* Create a new {@link ThreadPerChannelEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
*/
protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
if (maxChannels < 0) {
throw new IllegalArgumentException(String.format(
"maxChannels: %d (expected: >= 0)", maxChannels));
}
if (executor == null) {
throw new NullPointerException("executor");
}
if (args == null) {
childArgs = EmptyArrays.EMPTY_OBJECTS;
} else {
childArgs = args.clone();
}
this.maxChannels = maxChannels;
this.executor = executor;
tooManyChannels = ThrowableUtil.unknownStackTrace(
new ChannelException("too many channels (max: " + maxChannels + ')'),
ThreadPerChannelEventLoopGroup.class, "nextChild()");
}
/**
* Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}.
*/
protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
return new ThreadPerChannelEventLoop(this);
}
@Override
public Iterator<EventExecutor> iterator() {
return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
}
@Override
public EventLoop next() {
throw new UnsupportedOperationException();
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
shuttingDown = true;
for (EventLoop l: activeChildren) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
for (EventLoop l: idleChildren) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
// Notify the future if there was no children.
if (isTerminated()) {
terminationFuture.trySuccess(null);
}
return terminationFuture();
}
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
@Override
@Deprecated
public void shutdown() {
shuttingDown = true;
for (EventLoop l: activeChildren) {
l.shutdown();
}
for (EventLoop l: idleChildren) {
l.shutdown();
}
// Notify the future if there was no children.
if (isTerminated()) {
terminationFuture.trySuccess(null);
}
}
@Override
public boolean isShuttingDown() {
for (EventLoop l: activeChildren) {
if (!l.isShuttingDown()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isShuttingDown()) {
return false;
}
}
return true;
}
@Override
public boolean isShutdown() {
for (EventLoop l: activeChildren) {
if (!l.isShutdown()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
@Override
public boolean isTerminated() {
for (EventLoop l: activeChildren) {
if (!l.isTerminated()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
for (EventLoop l: activeChildren) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
for (EventLoop l: idleChildren) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
@Override
public ChannelFuture register(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
EventLoop l = nextChild();
return l.register(new DefaultChannelPromise(channel, l));
} catch (Throwable t) {
return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
}
}
@Override
public ChannelFuture register(ChannelPromise promise) {
try {
return nextChild().register(promise);
} catch (Throwable t) {
promise.setFailure(t);
return promise;
}
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
return nextChild().register(channel, promise);
} catch (Throwable t) {
promise.setFailure(t);
return promise;
}
}
private EventLoop nextChild() throws Exception {
if (shuttingDown) {
throw new RejectedExecutionException("shutting down");
}
EventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;
}
loop = newChild(childArgs);
loop.terminationFuture().addListener(childTerminationListener);
}
activeChildren.add(loop);
return loop;
}
}

View File

@ -1,267 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.oio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
/**
* Abstract base class for OIO which reads and writes bytes from/to a Socket
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
public abstract class AbstractOioByteChannel extends AbstractOioChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(FileRegion.class) + ')';
/**
* @see AbstractOioByteChannel#AbstractOioByteChannel(Channel)
*/
protected AbstractOioByteChannel(Channel parent) {
super(parent);
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
/**
* Determine if the input side of this channel is shutdown.
* @return {@code true} if the input side of this channel is shutdown.
*/
protected abstract boolean isInputShutdown();
/**
* Shutdown the input side of this channel.
* @return A channel future that will complete when the shutdown is complete.
*/
protected abstract ChannelFuture shutdownInput();
private void closeOnRead(ChannelPipeline pipeline) {
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
shutdownInput();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
unsafe().close(unsafe().voidPromise());
}
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
@Override
protected void doRead() {
final ChannelConfig config = config();
if (isInputShutdown() || !readPending) {
// We have to check readPending here because the Runnable to read could have been scheduled and later
// during the same read loop readPending was set to false.
return;
}
// In OIO we should set readPending to false even if the read was not successful so we can schedule
// another read on the event loop if no reads are done.
readPending = false;
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
boolean readData = false;
try {
byteBuf = allocHandle.allocate(allocator);
do {
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
}
break;
} else {
readData = true;
}
final int available = available();
if (available <= 0) {
break;
}
// Oio collects consecutive read operations into 1 ByteBuf before propagating up the pipeline.
if (!byteBuf.isWritable()) {
final int capacity = byteBuf.capacity();
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = allocHandle.allocate(allocator);
} else {
final int writerIndex = byteBuf.writerIndex();
if (writerIndex + available > maxCapacity) {
byteBuf.capacity(maxCapacity);
} else {
byteBuf.ensureWritable(available);
}
}
}
} while (allocHandle.continueReading());
if (byteBuf != null) {
// It is possible we allocated a buffer because the previous one was not writable, but then didn't use
// it because allocHandle.continueReading() returned false.
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
byteBuf = null;
}
if (readData) {
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (readPending || config.isAutoRead() || !readData && isActive()) {
// Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
// should execute read() again because no data may have been read.
read();
}
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current();
if (msg == null) {
// nothing left to write
break;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
while (readableBytes > 0) {
doWriteBytes(buf);
int newReadableBytes = buf.readableBytes();
in.progress(readableBytes - newReadableBytes);
readableBytes = newReadableBytes;
}
in.remove();
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
long transferred = region.transferred();
doWriteFileRegion(region);
in.progress(region.transferred() - transferred);
in.remove();
} else {
in.remove(new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg)));
}
}
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
if (msg instanceof ByteBuf || msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
/**
* Return the number of bytes ready to read from the underlying Socket.
*/
protected abstract int available();
/**
* Read bytes from the underlying Socket.
*
* @param buf the {@link ByteBuf} into which the read bytes will be written
* @return amount the number of bytes read. This may return a negative amount if the underlying
* Socket was closed
* @throws Exception is thrown if an error occurred
*/
protected abstract int doReadBytes(ByteBuf buf) throws Exception;
/**
* Write the data which is hold by the {@link ByteBuf} to the underlying Socket.
*
* @param buf the {@link ByteBuf} which holds the data to transfer
* @throws Exception is thrown if an error occurred
*/
protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
/**
* Write the data which is hold by the {@link FileRegion} to the underlying Socket.
*
* @param region the {@link FileRegion} which holds the data to transfer
* @throws Exception is thrown if an error occurred
*/
protected abstract void doWriteFileRegion(FileRegion region) throws Exception;
}

View File

@ -1,161 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.oio;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.ThreadPerChannelEventLoop;
import java.net.SocketAddress;
/**
* Abstract base class for {@link Channel} implementations that use Old-Blocking-IO
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public abstract class AbstractOioChannel extends AbstractChannel {
protected static final int SO_TIMEOUT = 1000;
boolean readPending;
private final Runnable readTask = new Runnable() {
@Override
public void run() {
doRead();
}
};
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
readPending = false;
}
};
/**
* @see AbstractChannel#AbstractChannel(Channel)
*/
protected AbstractOioChannel(Channel parent) {
super(parent);
}
@Override
protected AbstractUnsafe newUnsafe() {
return new DefaultOioUnsafe();
}
private final class DefaultOioUnsafe extends AbstractUnsafe {
@Override
public void connect(
final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
boolean wasActive = isActive();
doConnect(remoteAddress, localAddress);
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
safeSetSuccess(promise);
if (!wasActive && active) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
safeSetFailure(promise, annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof ThreadPerChannelEventLoop;
}
/**
* Connect to the remote peer using the given localAddress if one is specified or {@code null} otherwise.
*/
protected abstract void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
@Override
protected void doBeginRead() throws Exception {
if (readPending) {
return;
}
readPending = true;
eventLoop().execute(readTask);
}
protected abstract void doRead();
/**
* @deprecated No longer supported.
* No longer supported.
*/
@Deprecated
protected boolean isReadPending() {
return readPending;
}
/**
* @deprecated Use {@link #clearReadPending()} if appropriate instead.
* No longer supported.
*/
@Deprecated
protected void setReadPending(final boolean readPending) {
if (isRegistered()) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
this.readPending = readPending;
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
AbstractOioChannel.this.readPending = readPending;
}
});
}
} else {
this.readPending = readPending;
}
}
/**
* Set read pending to {@code false}.
*/
protected final void clearReadPending() {
if (isRegistered()) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
readPending = false;
} else {
eventLoop.execute(clearReadPendingRunnable);
}
} else {
// Best effort if we are not registered yet clear readPending. This happens during channel initialization.
readPending = false;
}
}
}

View File

@ -1,113 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.RecvByteBufAllocator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Abstract base class for OIO which reads and writes objects from/to a Socket
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
private final List<Object> readBuf = new ArrayList<Object>();
protected AbstractOioMessageChannel(Channel parent) {
super(parent);
}
@Override
protected void doRead() {
if (!readPending) {
// We have to check readPending here because the Runnable to read could have been scheduled and later
// during the same read loop readPending was set to false.
return;
}
// In OIO we should set readPending to false even if the read was not successful so we can schedule
// another read on the event loop if no reads are done.
readPending = false;
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
do {
// Perform a read.
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
boolean readData = false;
int size = readBuf.size();
if (size > 0) {
readData = true;
for (int i = 0; i < size; i++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
if (exception != null) {
if (exception instanceof IOException) {
closed = true;
}
pipeline.fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
unsafe().close(unsafe().voidPromise());
}
} else if (readPending || config.isAutoRead() || !readData && isActive()) {
// Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
// should execute read() again because no data may have been read.
read();
}
}
/**
* Read messages into the given array and return the amount which was read.
*/
protected abstract int doReadMessages(List<Object> msgs) throws Exception;
}

View File

@ -1,173 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.oio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.WritableByteChannel;
/**
* Abstract base class for OIO Channels that are based on streams.
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
private static final InputStream CLOSED_IN = new InputStream() {
@Override
public int read() {
return -1;
}
};
private static final OutputStream CLOSED_OUT = new OutputStream() {
@Override
public void write(int b) throws IOException {
throw new ClosedChannelException();
}
};
private InputStream is;
private OutputStream os;
private WritableByteChannel outChannel;
/**
* Create a new instance
*
* @param parent the parent {@link Channel} which was used to create this instance. This can be null if the
* {@link} has no parent as it was created by your self.
*/
protected OioByteStreamChannel(Channel parent) {
super(parent);
}
/**
* Activate this instance. After this call {@link #isActive()} will return {@code true}.
*/
protected final void activate(InputStream is, OutputStream os) {
if (this.is != null) {
throw new IllegalStateException("input was set already");
}
if (this.os != null) {
throw new IllegalStateException("output was set already");
}
if (is == null) {
throw new NullPointerException("is");
}
if (os == null) {
throw new NullPointerException("os");
}
this.is = is;
this.os = os;
}
@Override
public boolean isActive() {
InputStream is = this.is;
if (is == null || is == CLOSED_IN) {
return false;
}
OutputStream os = this.os;
return !(os == null || os == CLOSED_OUT);
}
@Override
protected int available() {
try {
return is.available();
} catch (IOException ignored) {
return 0;
}
}
@Override
protected int doReadBytes(ByteBuf buf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(Math.max(1, Math.min(available(), buf.maxWritableBytes())));
return buf.writeBytes(is, allocHandle.attemptedBytesRead());
}
@Override
protected void doWriteBytes(ByteBuf buf) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
}
buf.readBytes(os, buf.readableBytes());
}
@Override
protected void doWriteFileRegion(FileRegion region) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
}
if (outChannel == null) {
outChannel = Channels.newChannel(os);
}
long written = 0;
for (;;) {
long localWritten = region.transferTo(outChannel, written);
if (localWritten == -1) {
checkEOF(region);
return;
}
written += localWritten;
if (written >= region.count()) {
return;
}
}
}
private static void checkEOF(FileRegion region) throws IOException {
if (region.transferred() < region.count()) {
throw new EOFException("Expected to be able to write " + region.count() + " bytes, " +
"but only wrote " + region.transferred());
}
}
@Override
protected void doClose() throws Exception {
InputStream is = this.is;
OutputStream os = this.os;
this.is = CLOSED_IN;
this.os = CLOSED_OUT;
try {
if (is != null) {
is.close();
}
} finally {
if (os != null) {
os.close();
}
}
}
}

View File

@ -1,88 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ThreadPerChannelEventLoopGroup;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* {@link EventLoopGroup} which is used to handle OIO {@link Channel}'s. Each {@link Channel} will be handled by its
* own {@link EventLoop} to not block others.
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {
/**
* Create a new {@link OioEventLoopGroup} with no limit in place.
*/
public OioEventLoopGroup() {
this(0);
}
/**
* Create a new {@link OioEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
*/
public OioEventLoopGroup(int maxChannels) {
this(maxChannels, Executors.defaultThreadFactory());
}
/**
* Create a new {@link OioEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
*/
public OioEventLoopGroup(int maxChannels, Executor executor) {
super(maxChannels, executor);
}
/**
* Create a new {@link OioEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
*/
public OioEventLoopGroup(int maxChannels, ThreadFactory threadFactory) {
super(maxChannels, threadFactory);
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Old blocking I/O based channel API implementation - recommended for
* a small number of connections (&lt; 1000).
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
package io.netty.channel.oio;

View File

@ -1,207 +0,0 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.PreferHeapByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DefaultDatagramChannelConfig;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Map;
import static io.netty.channel.ChannelOption.SO_TIMEOUT;
final class DefaultOioDatagramChannelConfig extends DefaultDatagramChannelConfig implements OioDatagramChannelConfig {
DefaultOioDatagramChannelConfig(DatagramChannel channel, DatagramSocket javaSocket) {
super(channel, javaSocket);
setAllocator(new PreferHeapByteBufAllocator(getAllocator()));
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_TIMEOUT);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_TIMEOUT) {
return (T) Integer.valueOf(getSoTimeout());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_TIMEOUT) {
setSoTimeout((Integer) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public OioDatagramChannelConfig setSoTimeout(int timeout) {
try {
javaSocket().setSoTimeout(timeout);
} catch (IOException e) {
throw new ChannelException(e);
}
return this;
}
@Override
public int getSoTimeout() {
try {
return javaSocket().getSoTimeout();
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public OioDatagramChannelConfig setBroadcast(boolean broadcast) {
super.setBroadcast(broadcast);
return this;
}
@Override
public OioDatagramChannelConfig setInterface(InetAddress interfaceAddress) {
super.setInterface(interfaceAddress);
return this;
}
@Override
public OioDatagramChannelConfig setLoopbackModeDisabled(boolean loopbackModeDisabled) {
super.setLoopbackModeDisabled(loopbackModeDisabled);
return this;
}
@Override
public OioDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) {
super.setNetworkInterface(networkInterface);
return this;
}
@Override
public OioDatagramChannelConfig setReuseAddress(boolean reuseAddress) {
super.setReuseAddress(reuseAddress);
return this;
}
@Override
public OioDatagramChannelConfig setReceiveBufferSize(int receiveBufferSize) {
super.setReceiveBufferSize(receiveBufferSize);
return this;
}
@Override
public OioDatagramChannelConfig setSendBufferSize(int sendBufferSize) {
super.setSendBufferSize(sendBufferSize);
return this;
}
@Override
public OioDatagramChannelConfig setTimeToLive(int ttl) {
super.setTimeToLive(ttl);
return this;
}
@Override
public OioDatagramChannelConfig setTrafficClass(int trafficClass) {
super.setTrafficClass(trafficClass);
return this;
}
@Override
public OioDatagramChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public OioDatagramChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public OioDatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public OioDatagramChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public OioDatagramChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public OioDatagramChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
public OioDatagramChannelConfig setAutoClose(boolean autoClose) {
super.setAutoClose(autoClose);
return this;
}
@Override
public OioDatagramChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public OioDatagramChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public OioDatagramChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
super.setWriteBufferWaterMark(writeBufferWaterMark);
return this;
}
@Override
public OioDatagramChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
}

View File

@ -1,197 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.PreferHeapByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Map;
import static io.netty.channel.ChannelOption.*;
/**
* Default {@link OioServerSocketChannelConfig} implementation
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChannelConfig implements
OioServerSocketChannelConfig {
@Deprecated
public DefaultOioServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
super(channel, javaSocket);
setAllocator(new PreferHeapByteBufAllocator(getAllocator()));
}
DefaultOioServerSocketChannelConfig(OioServerSocketChannel channel, ServerSocket javaSocket) {
super(channel, javaSocket);
setAllocator(new PreferHeapByteBufAllocator(getAllocator()));
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(
super.getOptions(), SO_TIMEOUT);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_TIMEOUT) {
return (T) Integer.valueOf(getSoTimeout());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_TIMEOUT) {
setSoTimeout((Integer) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public OioServerSocketChannelConfig setSoTimeout(int timeout) {
try {
javaSocket.setSoTimeout(timeout);
} catch (IOException e) {
throw new ChannelException(e);
}
return this;
}
@Override
public int getSoTimeout() {
try {
return javaSocket.getSoTimeout();
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public OioServerSocketChannelConfig setBacklog(int backlog) {
super.setBacklog(backlog);
return this;
}
@Override
public OioServerSocketChannelConfig setReuseAddress(boolean reuseAddress) {
super.setReuseAddress(reuseAddress);
return this;
}
@Override
public OioServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
super.setReceiveBufferSize(receiveBufferSize);
return this;
}
@Override
public OioServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
super.setPerformancePreferences(connectionTime, latency, bandwidth);
return this;
}
@Override
public OioServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
@Deprecated
public OioServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public OioServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public OioServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public OioServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public OioServerSocketChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
protected void autoReadCleared() {
if (channel instanceof OioServerSocketChannel) {
((OioServerSocketChannel) channel).clearReadPending0();
}
}
@Override
public OioServerSocketChannelConfig setAutoClose(boolean autoClose) {
super.setAutoClose(autoClose);
return this;
}
@Override
public OioServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public OioServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public OioServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
super.setWriteBufferWaterMark(writeBufferWaterMark);
return this;
}
@Override
public OioServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
}

View File

@ -1,225 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.PreferHeapByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.net.Socket;
import java.util.Map;
import static io.netty.channel.ChannelOption.*;
/**
* Default {@link OioSocketChannelConfig} implementation
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig implements OioSocketChannelConfig {
@Deprecated
public DefaultOioSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
super(channel, javaSocket);
setAllocator(new PreferHeapByteBufAllocator(getAllocator()));
}
DefaultOioSocketChannelConfig(OioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket);
setAllocator(new PreferHeapByteBufAllocator(getAllocator()));
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(
super.getOptions(), SO_TIMEOUT);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_TIMEOUT) {
return (T) Integer.valueOf(getSoTimeout());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_TIMEOUT) {
setSoTimeout((Integer) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public OioSocketChannelConfig setSoTimeout(int timeout) {
try {
javaSocket.setSoTimeout(timeout);
} catch (IOException e) {
throw new ChannelException(e);
}
return this;
}
@Override
public int getSoTimeout() {
try {
return javaSocket.getSoTimeout();
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public OioSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
super.setTcpNoDelay(tcpNoDelay);
return this;
}
@Override
public OioSocketChannelConfig setSoLinger(int soLinger) {
super.setSoLinger(soLinger);
return this;
}
@Override
public OioSocketChannelConfig setSendBufferSize(int sendBufferSize) {
super.setSendBufferSize(sendBufferSize);
return this;
}
@Override
public OioSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
super.setReceiveBufferSize(receiveBufferSize);
return this;
}
@Override
public OioSocketChannelConfig setKeepAlive(boolean keepAlive) {
super.setKeepAlive(keepAlive);
return this;
}
@Override
public OioSocketChannelConfig setTrafficClass(int trafficClass) {
super.setTrafficClass(trafficClass);
return this;
}
@Override
public OioSocketChannelConfig setReuseAddress(boolean reuseAddress) {
super.setReuseAddress(reuseAddress);
return this;
}
@Override
public OioSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
super.setPerformancePreferences(connectionTime, latency, bandwidth);
return this;
}
@Override
public OioSocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) {
super.setAllowHalfClosure(allowHalfClosure);
return this;
}
@Override
public OioSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
@Deprecated
public OioSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public OioSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public OioSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public OioSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public OioSocketChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
protected void autoReadCleared() {
if (channel instanceof OioSocketChannel) {
((OioSocketChannel) channel).clearReadPending0();
}
}
@Override
public OioSocketChannelConfig setAutoClose(boolean autoClose) {
super.setAutoClose(autoClose);
return this;
}
@Override
public OioSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public OioSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public OioSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
super.setWriteBufferWaterMark(writeBufferWaterMark);
return this;
}
@Override
public OioSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
}

View File

@ -1,446 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.oio.AbstractOioMessageChannel;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.NotYetConnectedException;
import java.util.List;
import java.util.Locale;
/**
* An OIO datagram {@link Channel} that sends and receives an
* {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
*
* @see AddressedEnvelope
* @see DatagramPacket
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public class OioDatagramChannel extends AbstractOioMessageChannel
implements DatagramChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioDatagramChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(SocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private final MulticastSocket socket;
private final OioDatagramChannelConfig config;
private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0);
private static MulticastSocket newSocket() {
try {
return new MulticastSocket(null);
} catch (Exception e) {
throw new ChannelException("failed to create a new socket", e);
}
}
/**
* Create a new instance with an new {@link MulticastSocket}.
*/
public OioDatagramChannel() {
this(newSocket());
}
/**
* Create a new instance from the given {@link MulticastSocket}.
*
* @param socket the {@link MulticastSocket} which is used by this instance
*/
public OioDatagramChannel(MulticastSocket socket) {
super(null);
boolean success = false;
try {
socket.setSoTimeout(SO_TIMEOUT);
socket.setBroadcast(false);
success = true;
} catch (SocketException e) {
throw new ChannelException(
"Failed to configure the datagram socket timeout.", e);
} finally {
if (!success) {
socket.close();
}
}
this.socket = socket;
config = new DefaultOioDatagramChannelConfig(this, socket);
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
/**
* {@inheritDoc}
*
* This can be safetly cast to {@link OioDatagramChannelConfig}.
*/
@Override
// TODO: Change return type to OioDatagramChannelConfig in next major release
public DatagramChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return !socket.isClosed();
}
@Override
@SuppressWarnings("deprecation")
public boolean isActive() {
return isOpen()
&& (config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
|| socket.isBound());
}
@Override
public boolean isConnected() {
return socket.isConnected();
}
@Override
protected SocketAddress localAddress0() {
return socket.getLocalSocketAddress();
}
@Override
protected SocketAddress remoteAddress0() {
return socket.getRemoteSocketAddress();
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
socket.bind(localAddress);
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
protected void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) throws Exception {
if (localAddress != null) {
socket.bind(localAddress);
}
boolean success = false;
try {
socket.connect(remoteAddress);
success = true;
} finally {
if (!success) {
try {
socket.close();
} catch (Throwable t) {
logger.warn("Failed to close a socket.", t);
}
}
}
}
@Override
protected void doDisconnect() throws Exception {
socket.disconnect();
}
@Override
protected void doClose() throws Exception {
socket.close();
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannelConfig config = config();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess());
boolean free = true;
try {
// Ensure we null out the address which may have been set before.
tmpPacket.setAddress(null);
tmpPacket.setData(data.array(), data.arrayOffset(), data.capacity());
socket.receive(tmpPacket);
InetSocketAddress remoteAddr = (InetSocketAddress) tmpPacket.getSocketAddress();
allocHandle.lastBytesRead(tmpPacket.getLength());
buf.add(new DatagramPacket(data.writerIndex(allocHandle.lastBytesRead()), localAddress(), remoteAddr));
free = false;
return 1;
} catch (SocketTimeoutException e) {
// Expected
return 0;
} catch (SocketException e) {
if (!e.getMessage().toLowerCase(Locale.US).contains("socket closed")) {
throw e;
}
return -1;
} catch (Throwable cause) {
PlatformDependent.throwException(cause);
return -1;
} finally {
if (free) {
data.release();
}
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
final Object o = in.current();
if (o == null) {
break;
}
final ByteBuf data;
final SocketAddress remoteAddress;
if (o instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o;
remoteAddress = envelope.recipient();
data = envelope.content();
} else {
data = (ByteBuf) o;
remoteAddress = null;
}
final int length = data.readableBytes();
try {
if (remoteAddress != null) {
tmpPacket.setSocketAddress(remoteAddress);
} else {
if (!isConnected()) {
// If not connected we should throw a NotYetConnectedException() to be consistent with
// NioDatagramChannel
throw new NotYetConnectedException();
}
// Ensure we null out the address which may have been set before.
tmpPacket.setAddress(null);
}
if (data.hasArray()) {
tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length);
} else {
tmpPacket.setData(ByteBufUtil.getBytes(data, data.readerIndex(), length));
}
socket.send(tmpPacket);
in.remove();
} catch (Exception e) {
// Continue on write error as a DatagramChannel can write to multiple remote peers
//
// See https://github.com/netty/netty/issues/2665
in.remove(e);
}
}
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof DatagramPacket || msg instanceof ByteBuf) {
return msg;
}
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
if (e.content() instanceof ByteBuf) {
return msg;
}
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress) {
return joinGroup(multicastAddress, newPromise());
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
ensureBound();
try {
socket.joinGroup(multicastAddress);
promise.setSuccess();
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
}
@Override
public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
return joinGroup(multicastAddress, networkInterface, newPromise());
}
@Override
public ChannelFuture joinGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface,
ChannelPromise promise) {
ensureBound();
try {
socket.joinGroup(multicastAddress, networkInterface);
promise.setSuccess();
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
}
@Override
public ChannelFuture joinGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
return newFailedFuture(new UnsupportedOperationException());
}
@Override
public ChannelFuture joinGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
return promise;
}
private void ensureBound() {
if (!isActive()) {
throw new IllegalStateException(
DatagramChannel.class.getName() +
" must be bound to join a group.");
}
}
@Override
public ChannelFuture leaveGroup(InetAddress multicastAddress) {
return leaveGroup(multicastAddress, newPromise());
}
@Override
public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
try {
socket.leaveGroup(multicastAddress);
promise.setSuccess();
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
}
@Override
public ChannelFuture leaveGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
return leaveGroup(multicastAddress, networkInterface, newPromise());
}
@Override
public ChannelFuture leaveGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface,
ChannelPromise promise) {
try {
socket.leaveGroup(multicastAddress, networkInterface);
promise.setSuccess();
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
}
@Override
public ChannelFuture leaveGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
return newFailedFuture(new UnsupportedOperationException());
}
@Override
public ChannelFuture leaveGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source,
ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
return promise;
}
@Override
public ChannelFuture block(InetAddress multicastAddress,
NetworkInterface networkInterface, InetAddress sourceToBlock) {
return newFailedFuture(new UnsupportedOperationException());
}
@Override
public ChannelFuture block(InetAddress multicastAddress,
NetworkInterface networkInterface, InetAddress sourceToBlock,
ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
return promise;
}
@Override
public ChannelFuture block(InetAddress multicastAddress,
InetAddress sourceToBlock) {
return newFailedFuture(new UnsupportedOperationException());
}
@Override
public ChannelFuture block(InetAddress multicastAddress,
InetAddress sourceToBlock, ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
return promise;
}
}

View File

@ -1,101 +0,0 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.DatagramChannelConfig;
import java.net.InetAddress;
import java.net.NetworkInterface;
/**
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public interface OioDatagramChannelConfig extends DatagramChannelConfig {
/**
* Sets the maximal time a operation on the underlying socket may block.
*/
OioDatagramChannelConfig setSoTimeout(int timeout);
/**
* Returns the maximal time a operation on the underlying socket may block.
*/
int getSoTimeout();
@Override
OioDatagramChannelConfig setSendBufferSize(int sendBufferSize);
@Override
OioDatagramChannelConfig setReceiveBufferSize(int receiveBufferSize);
@Override
OioDatagramChannelConfig setTrafficClass(int trafficClass);
@Override
OioDatagramChannelConfig setReuseAddress(boolean reuseAddress);
@Override
OioDatagramChannelConfig setBroadcast(boolean broadcast);
@Override
OioDatagramChannelConfig setLoopbackModeDisabled(boolean loopbackModeDisabled);
@Override
OioDatagramChannelConfig setTimeToLive(int ttl);
@Override
OioDatagramChannelConfig setInterface(InetAddress interfaceAddress);
@Override
OioDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface);
@Override
OioDatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
OioDatagramChannelConfig setWriteSpinCount(int writeSpinCount);
@Override
OioDatagramChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
OioDatagramChannelConfig setAllocator(ByteBufAllocator allocator);
@Override
OioDatagramChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);
@Override
OioDatagramChannelConfig setAutoRead(boolean autoRead);
@Override
OioDatagramChannelConfig setAutoClose(boolean autoClose);
@Override
OioDatagramChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
@Override
OioDatagramChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);
@Override
OioDatagramChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override
OioDatagramChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
}

View File

@ -1,211 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.oio.AbstractOioMessageChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.util.internal.SocketUtils;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* {@link ServerSocketChannel} which accepts new connections and create the {@link OioSocketChannel}'s for them.
*
* This implementation use Old-Blocking-IO.
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public class OioServerSocketChannel extends AbstractOioMessageChannel
implements ServerSocketChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioServerSocketChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 1);
private static ServerSocket newServerSocket() {
try {
return new ServerSocket();
} catch (IOException e) {
throw new ChannelException("failed to create a server socket", e);
}
}
final ServerSocket socket;
final Lock shutdownLock = new ReentrantLock();
private final OioServerSocketChannelConfig config;
/**
* Create a new instance with an new {@link Socket}
*/
public OioServerSocketChannel() {
this(newServerSocket());
}
/**
* Create a new instance from the given {@link ServerSocket}
*
* @param socket the {@link ServerSocket} which is used by this instance
*/
public OioServerSocketChannel(ServerSocket socket) {
super(null);
if (socket == null) {
throw new NullPointerException("socket");
}
boolean success = false;
try {
socket.setSoTimeout(SO_TIMEOUT);
success = true;
} catch (IOException e) {
throw new ChannelException(
"Failed to set the server socket timeout.", e);
} finally {
if (!success) {
try {
socket.close();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e);
}
}
}
}
this.socket = socket;
config = new DefaultOioServerSocketChannelConfig(this, socket);
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public OioServerSocketChannelConfig config() {
return config;
}
@Override
public InetSocketAddress remoteAddress() {
return null;
}
@Override
public boolean isOpen() {
return !socket.isClosed();
}
@Override
public boolean isActive() {
return isOpen() && socket.isBound();
}
@Override
protected SocketAddress localAddress0() {
return SocketUtils.localSocketAddress(socket);
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
socket.bind(localAddress, config.getBacklog());
}
@Override
protected void doClose() throws Exception {
socket.close();
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
if (socket.isClosed()) {
return -1;
}
try {
Socket s = socket.accept();
try {
buf.add(new OioSocketChannel(this, s));
return 1;
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
s.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
} catch (SocketTimeoutException e) {
// Expected
}
return 0;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Deprecated
@Override
protected void setReadPending(boolean readPending) {
super.setReadPending(readPending);
}
final void clearReadPending0() {
super.clearReadPending();
}
}

View File

@ -1,103 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.ServerSocketChannelConfig;
/**
* A {@link ServerSocketChannelConfig} for a {@link OioServerSocketChannel}.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link ServerSocketChannelConfig},
* {@link OioServerSocketChannelConfig} allows the following options in the
* option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@link ChannelOption#SO_TIMEOUT}</td><td>{@link #setSoTimeout(int)}</td>
* </tr>
* </table>
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public interface OioServerSocketChannelConfig extends ServerSocketChannelConfig {
/**
* Sets the maximal time a operation on the underlying socket may block.
*/
OioServerSocketChannelConfig setSoTimeout(int timeout);
/**
* Returns the maximal time a operation on the underlying socket may block.
*/
int getSoTimeout();
@Override
OioServerSocketChannelConfig setBacklog(int backlog);
@Override
OioServerSocketChannelConfig setReuseAddress(boolean reuseAddress);
@Override
OioServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize);
@Override
OioServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth);
@Override
OioServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
@Deprecated
OioServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
OioServerSocketChannelConfig setWriteSpinCount(int writeSpinCount);
@Override
OioServerSocketChannelConfig setAllocator(ByteBufAllocator allocator);
@Override
OioServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);
@Override
OioServerSocketChannelConfig setAutoRead(boolean autoRead);
@Override
OioServerSocketChannelConfig setAutoClose(boolean autoClose);
@Override
OioServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override
OioServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
@Override
OioServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);
@Override
OioServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
}

View File

@ -1,350 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.channel.oio.OioByteStreamChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.internal.SocketUtils;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
/**
* A {@link SocketChannel} which is using Old-Blocking-IO
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public class OioSocketChannel extends OioByteStreamChannel implements SocketChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(OioSocketChannel.class);
private final Socket socket;
private final OioSocketChannelConfig config;
/**
* Create a new instance with an new {@link Socket}
*/
public OioSocketChannel() {
this(new Socket());
}
/**
* Create a new instance from the given {@link Socket}
*
* @param socket the {@link Socket} which is used by this instance
*/
public OioSocketChannel(Socket socket) {
this(null, socket);
}
/**
* Create a new instance from the given {@link Socket}
*
* @param parent the parent {@link Channel} which was used to create this instance. This can be null if the
* {@link} has no parent as it was created by your self.
* @param socket the {@link Socket} which is used by this instance
*/
public OioSocketChannel(Channel parent, Socket socket) {
super(parent);
this.socket = socket;
config = new DefaultOioSocketChannelConfig(this, socket);
boolean success = false;
try {
if (socket.isConnected()) {
activate(socket.getInputStream(), socket.getOutputStream());
}
socket.setSoTimeout(SO_TIMEOUT);
success = true;
} catch (Exception e) {
throw new ChannelException("failed to initialize a socket", e);
} finally {
if (!success) {
try {
socket.close();
} catch (IOException e) {
logger.warn("Failed to close a socket.", e);
}
}
}
}
@Override
public ServerSocketChannel parent() {
return (ServerSocketChannel) super.parent();
}
@Override
public OioSocketChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return !socket.isClosed();
}
@Override
public boolean isActive() {
return !socket.isClosed() && socket.isConnected();
}
@Override
public boolean isOutputShutdown() {
return socket.isOutputShutdown() || !isActive();
}
@Override
public boolean isInputShutdown() {
return socket.isInputShutdown() || !isActive();
}
@Override
public boolean isShutdown() {
return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive();
}
@UnstableApi
@Override
protected final void doShutdownOutput() throws Exception {
shutdownOutput0();
}
@Override
public ChannelFuture shutdownOutput() {
return shutdownOutput(newPromise());
}
@Override
public ChannelFuture shutdownInput() {
return shutdownInput(newPromise());
}
@Override
public ChannelFuture shutdown() {
return shutdown(newPromise());
}
@Override
protected int doReadBytes(ByteBuf buf) throws Exception {
if (socket.isClosed()) {
return -1;
}
try {
return super.doReadBytes(buf);
} catch (SocketTimeoutException ignored) {
return 0;
}
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
return promise;
}
private void shutdownOutput0(ChannelPromise promise) {
try {
shutdownOutput0();
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
}
private void shutdownOutput0() throws IOException {
socket.shutdownOutput();
}
@Override
public ChannelFuture shutdownInput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownInput0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownInput0(promise);
}
});
}
return promise;
}
private void shutdownInput0(ChannelPromise promise) {
try {
socket.shutdownInput();
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
}
@Override
public ChannelFuture shutdown(final ChannelPromise promise) {
ChannelFuture shutdownOutputFuture = shutdownOutput();
if (shutdownOutputFuture.isDone()) {
shutdownOutputDone(shutdownOutputFuture, promise);
} else {
shutdownOutputFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
shutdownOutputDone(shutdownOutputFuture, promise);
}
});
}
return promise;
}
private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
ChannelFuture shutdownInputFuture = shutdownInput();
if (shutdownInputFuture.isDone()) {
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
} else {
shutdownInputFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
}
});
}
}
private static void shutdownDone(ChannelFuture shutdownOutputFuture,
ChannelFuture shutdownInputFuture,
ChannelPromise promise) {
Throwable shutdownOutputCause = shutdownOutputFuture.cause();
Throwable shutdownInputCause = shutdownInputFuture.cause();
if (shutdownOutputCause != null) {
if (shutdownInputCause != null) {
logger.debug("Exception suppressed because a previous exception occurred.",
shutdownInputCause);
}
promise.setFailure(shutdownOutputCause);
} else if (shutdownInputCause != null) {
promise.setFailure(shutdownInputCause);
} else {
promise.setSuccess();
}
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
protected SocketAddress localAddress0() {
return socket.getLocalSocketAddress();
}
@Override
protected SocketAddress remoteAddress0() {
return socket.getRemoteSocketAddress();
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
SocketUtils.bind(socket, localAddress);
}
@Override
protected void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) throws Exception {
if (localAddress != null) {
SocketUtils.bind(socket, localAddress);
}
boolean success = false;
try {
SocketUtils.connect(socket, remoteAddress, config().getConnectTimeoutMillis());
activate(socket.getInputStream(), socket.getOutputStream());
success = true;
} catch (SocketTimeoutException e) {
ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
cause.setStackTrace(e.getStackTrace());
throw cause;
} finally {
if (!success) {
doClose();
}
}
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
socket.close();
}
protected boolean checkInputShutdown() {
if (isInputShutdown()) {
try {
Thread.sleep(config().getSoTimeout());
} catch (Throwable e) {
// ignore
}
return true;
}
return false;
}
@Deprecated
@Override
protected void setReadPending(boolean readPending) {
super.setReadPending(readPending);
}
final void clearReadPending0() {
clearReadPending();
}
}

View File

@ -1,118 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.SocketChannelConfig;
/**
* A {@link ChannelConfig} for a {@link OioSocketChannel}.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link SocketChannelConfig},
* {@link OioSocketChannelConfig} allows the following options in the
* option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@link ChannelOption#SO_TIMEOUT}</td><td>{@link #setSoTimeout(int)}</td>
* </tr>
* </table>
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
public interface OioSocketChannelConfig extends SocketChannelConfig {
/**
* Sets the maximal time a operation on the underlying socket may block.
*/
OioSocketChannelConfig setSoTimeout(int timeout);
/**
* Returns the maximal time a operation on the underlying socket may block.
*/
int getSoTimeout();
@Override
OioSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay);
@Override
OioSocketChannelConfig setSoLinger(int soLinger);
@Override
OioSocketChannelConfig setSendBufferSize(int sendBufferSize);
@Override
OioSocketChannelConfig setReceiveBufferSize(int receiveBufferSize);
@Override
OioSocketChannelConfig setKeepAlive(boolean keepAlive);
@Override
OioSocketChannelConfig setTrafficClass(int trafficClass);
@Override
OioSocketChannelConfig setReuseAddress(boolean reuseAddress);
@Override
OioSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth);
@Override
OioSocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure);
@Override
OioSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
@Deprecated
OioSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
OioSocketChannelConfig setWriteSpinCount(int writeSpinCount);
@Override
OioSocketChannelConfig setAllocator(ByteBufAllocator allocator);
@Override
OioSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);
@Override
OioSocketChannelConfig setAutoRead(boolean autoRead);
@Override
OioSocketChannelConfig setAutoClose(boolean autoClose);
@Override
OioSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override
OioSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
@Override
OioSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);
@Override
OioSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Old blocking I/O based socket channel API implementation - recommended for
* a small number of connections (&lt; 1000).
*
* @deprecated use NIO / EPOLL / KQUEUE transport.
*/
@Deprecated
package io.netty.channel.socket.oio;

View File

@ -26,9 +26,7 @@ import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
@ -993,11 +991,6 @@ public class DefaultChannelPipelineTest {
testAddInListener(new NioSocketChannel(), new NioEventLoopGroup(1));
}
@Test(timeout = 3000)
public void testAddInListenerOio() throws Throwable {
testAddInListener(new OioSocketChannel(), new OioEventLoopGroup(1));
}
@Test(timeout = 3000)
public void testAddInListenerLocal() throws Throwable {
testAddInListener(new LocalChannel(), new DefaultEventLoopGroup(1));

View File

@ -1,114 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class ThreadPerChannelEventLoopGroupTest {
private static final ChannelHandler NOOP_HANDLER = new ChannelHandlerAdapter() {
@Override
public boolean isSharable() {
return true;
}
};
@Test
public void testTerminationFutureSuccessInLog() throws Exception {
for (int i = 0; i < 2; i++) {
ThreadPerChannelEventLoopGroup loopGroup = new ThreadPerChannelEventLoopGroup(64);
runTest(loopGroup);
}
}
@Test
public void testTerminationFutureSuccessReflectively() throws Exception {
Field terminationFutureField =
ThreadPerChannelEventLoopGroup.class.getDeclaredField("terminationFuture");
terminationFutureField.setAccessible(true);
final Exception[] exceptionHolder = new Exception[1];
for (int i = 0; i < 2; i++) {
ThreadPerChannelEventLoopGroup loopGroup = new ThreadPerChannelEventLoopGroup(64);
Promise<?> promise = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE) {
@Override
public Promise<Void> setSuccess(Void result) {
try {
return super.setSuccess(result);
} catch (IllegalStateException e) {
exceptionHolder[0] = e;
throw e;
}
}
};
terminationFutureField.set(loopGroup, promise);
runTest(loopGroup);
}
// The global event executor will not terminate, but this will give the test a chance to fail.
GlobalEventExecutor.INSTANCE.awaitTermination(100, TimeUnit.MILLISECONDS);
assertNull(exceptionHolder[0]);
}
private static void runTest(ThreadPerChannelEventLoopGroup loopGroup) throws InterruptedException {
int taskCount = 100;
EventExecutor testExecutor = new TestEventExecutor();
ChannelGroup channelGroup = new DefaultChannelGroup(testExecutor);
while (taskCount-- > 0) {
Channel channel = new EmbeddedChannel(NOOP_HANDLER);
loopGroup.register(new DefaultChannelPromise(channel, testExecutor));
channelGroup.add(channel);
}
channelGroup.close().sync();
loopGroup.shutdownGracefully(100, 200, TimeUnit.MILLISECONDS).sync();
assertTrue(loopGroup.isTerminated());
}
private static class TestEventExecutor extends SingleThreadEventExecutor {
TestEventExecutor() {
super(null, new DefaultThreadFactory("test"), false);
}
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}
}
}

View File

@ -1,115 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.oio;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.util.NetUtil;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
public class OioEventLoopTest {
@Test
public void testTooManyServerChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.channel(OioServerSocketChannel.class);
b.group(g);
b.childHandler(new ChannelInboundHandlerAdapter());
ChannelFuture f1 = b.bind(0);
f1.sync();
ChannelFuture f2 = b.bind(0);
f2.await();
assertThat(f2.cause(), is(instanceOf(ChannelException.class)));
assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels"));
final CountDownLatch notified = new CountDownLatch(1);
f2.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notified.countDown();
}
});
notified.await();
g.shutdownGracefully();
}
@Test
public void testTooManyClientChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap sb = new ServerBootstrap();
sb.channel(OioServerSocketChannel.class);
sb.group(g);
sb.childHandler(new ChannelInboundHandlerAdapter());
ChannelFuture f1 = sb.bind(0);
f1.sync();
Bootstrap cb = new Bootstrap();
cb.channel(OioSocketChannel.class);
cb.group(g);
cb.handler(new ChannelInboundHandlerAdapter());
ChannelFuture f2 = cb.connect(NetUtil.LOCALHOST, ((InetSocketAddress) f1.channel().localAddress()).getPort());
f2.await();
assertThat(f2.cause(), is(instanceOf(ChannelException.class)));
assertThat(f2.cause().getMessage().toLowerCase(), containsString("too many channels"));
final CountDownLatch notified = new CountDownLatch(1);
f2.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notified.countDown();
}
});
notified.await();
g.shutdownGracefully();
}
@Test
public void testTooManyAcceptedChannels() throws Exception {
EventLoopGroup g = new OioEventLoopGroup(1);
ServerBootstrap sb = new ServerBootstrap();
sb.channel(OioServerSocketChannel.class);
sb.group(g);
sb.childHandler(new ChannelInboundHandlerAdapter());
ChannelFuture f1 = sb.bind(0);
f1.sync();
Socket s = new Socket(NetUtil.LOCALHOST, ((InetSocketAddress) f1.channel().localAddress()).getPort());
assertThat(s.getInputStream().read(), is(-1));
s.close();
g.shutdownGracefully();
}
}