diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramConnectNotExistsTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramConnectNotExistsTest.java new file mode 100644 index 0000000000..8719918862 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramConnectNotExistsTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.socket.oio.OioDatagramChannel; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.util.CharsetUtil; +import io.netty.util.NetUtil; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; +import org.junit.Assert; +import org.junit.Test; + +import java.net.PortUnreachableException; +import java.util.List; + +public class DatagramConnectNotExistsTest extends AbstractClientSocketTest { + + @Override + protected List> newFactories() { + return SocketTestPermutation.INSTANCE.datagramSocket(); + } + + @Test(timeout = 10000) + public void testConnectNotExists() throws Throwable { + run(); + } + + public void testConnectNotExists(Bootstrap cb) throws Throwable { + final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); + cb.handler(new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + promise.trySuccess(cause); + } + }); + ChannelFuture future = cb.connect(NetUtil.LOCALHOST, SocketTestPermutation.BAD_PORT); + try { + Channel datagramChannel = future.syncUninterruptibly().channel(); + Assert.assertTrue(datagramChannel.isActive()); + datagramChannel.writeAndFlush( + Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII)).syncUninterruptibly(); + if (!(datagramChannel instanceof OioDatagramChannel)) { + Assert.assertTrue(promise.syncUninterruptibly().getNow() instanceof PortUnreachableException); + } + } finally { + future.channel().close(); + } + } +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java index 8c69373bad..26491b56a4 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java @@ -20,14 +20,17 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; import org.junit.Test; import java.net.InetSocketAddress; +import java.nio.channels.NotYetConnectedException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -129,29 +132,20 @@ public class DatagramUnicastTest extends AbstractDatagramTest { assertTrue(buf.release()); } + @Test + public void testSimpleSendWithConnect() throws Throwable { + run(); + } + + public void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb) throws Throwable { + testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 1); + testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 4); + } + @SuppressWarnings("deprecation") private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient, final byte[] bytes, int count, WrapType wrapType) throws Throwable { - final CountDownLatch latch = new CountDownLatch(count); - - sb.handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(new SimpleChannelInboundHandler() { - @Override - public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { - ByteBuf buf = msg.content(); - assertEquals(bytes.length, buf.readableBytes()); - for (byte b: bytes) { - assertEquals(b, buf.readByte()); - } - latch.countDown(); - } - }); - } - }); - cb.handler(new SimpleChannelInboundHandler() { @Override public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception { @@ -159,7 +153,9 @@ public class DatagramUnicastTest extends AbstractDatagramTest { } }); - Channel sc = sb.bind(newSocketAddress()).sync().channel(); + final CountDownLatch latch = new CountDownLatch(count); + Channel sc = setupServerChannel(sb, bytes, latch); + Channel cc; if (bindClient) { cc = cb.bind(newSocketAddress()).sync().channel(); @@ -167,15 +163,14 @@ public class DatagramUnicastTest extends AbstractDatagramTest { cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true); cc = cb.register().sync().channel(); } - InetSocketAddress addr = (InetSocketAddress) sc.localAddress(); for (int i = 0; i < count; i++) { switch (wrapType) { case DUP: - cc.write(new DatagramPacket(buf.retain().duplicate(), addr)); + cc.write(new DatagramPacket(buf.retainedDuplicate(), addr)); break; case SLICE: - cc.write(new DatagramPacket(buf.retain().slice(), addr)); + cc.write(new DatagramPacket(buf.retainedSlice(), addr)); break; case READ_ONLY: cc.write(new DatagramPacket(buf.retain().asReadOnly(), addr)); @@ -195,4 +190,90 @@ public class DatagramUnicastTest extends AbstractDatagramTest { sc.close().sync(); cc.close().sync(); } + + private void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count) + throws Throwable { + for (WrapType type: WrapType.values()) { + testSimpleSendWithConnect0(sb, cb, buf.retain(), bytes, count, type); + } + assertTrue(buf.release()); + } + + private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count, + WrapType wrapType) throws Throwable { + cb.handler(new SimpleChannelInboundHandler() { + @Override + public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception { + // Nothing will be sent. + } + }); + + final CountDownLatch latch = new CountDownLatch(count); + Channel sc = setupServerChannel(sb, bytes, latch); + DatagramChannel cc = null; + try { + cc = (DatagramChannel) cb.connect(sc.localAddress()).sync().channel(); + + for (int i = 0; i < count; i++) { + switch (wrapType) { + case DUP: + cc.write(buf.retainedDuplicate()); + break; + case SLICE: + cc.write(buf.retainedSlice()); + break; + case READ_ONLY: + cc.write(buf.retain().asReadOnly()); + break; + case NONE: + cc.write(buf.retain()); + break; + default: + throw new Error("unknown wrap type: " + wrapType); + } + } + cc.flush(); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertTrue(cc.isConnected()); + + // Test what happens when we call disconnect() + cc.disconnect().syncUninterruptibly(); + assertFalse(cc.isConnected()); + + ChannelFuture future = cc.writeAndFlush( + buf.retain().duplicate()).awaitUninterruptibly(); + assertTrue(future.cause() instanceof NotYetConnectedException); + } finally { + // release as we used buf.retain() before + buf.release(); + + sc.close().sync(); + if (cc != null) { + cc.close().sync(); + } + } + } + + @SuppressWarnings("deprecation") + private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final CountDownLatch latch) + throws Throwable { + sb.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new SimpleChannelInboundHandler() { + @Override + public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { + ByteBuf buf = msg.content(); + assertEquals(bytes.length, buf.readableBytes()); + for (byte b : bytes) { + assertEquals(b, buf.readByte()); + } + latch.countDown(); + } + }); + } + }); + return sb.bind(newSocketAddress()).sync().channel(); + } } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java index b4a9ceb86b..deecd521bc 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java @@ -16,7 +16,6 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -26,8 +25,6 @@ import io.netty.util.internal.SocketUtils; import io.netty.util.NetUtil; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; -import io.netty.util.internal.SystemPropertyUtil; -import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import org.junit.Test; @@ -38,21 +35,14 @@ import java.net.Socket; import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; import static org.junit.Assume.*; +import static io.netty.testsuite.transport.socket.SocketTestPermutation.BAD_HOST; +import static io.netty.testsuite.transport.socket.SocketTestPermutation.BAD_PORT; public class SocketConnectionAttemptTest extends AbstractClientSocketTest { - private static final String BAD_HOST = SystemPropertyUtil.get("io.netty.testsuite.badHost", "netty.io"); - private static final int BAD_PORT = SystemPropertyUtil.getInt("io.netty.testsuite.badPort", 65535); - // See /etc/services private static final int UNASSIGNED_PORT = 4; - static { - InternalLogger logger = InternalLoggerFactory.getInstance(SocketConnectionAttemptTest.class); - logger.debug("-Dio.netty.testsuite.badHost: {}", BAD_HOST); - logger.debug("-Dio.netty.testsuite.badPort: {}", BAD_PORT); - } - @Test(timeout = 30000) public void testConnectTimeout() throws Throwable { run(); @@ -123,7 +113,8 @@ public class SocketConnectionAttemptTest extends AbstractClientSocketTest { } } - assumeThat("The connection attempt to " + BAD_HOST + " does not time out.", badHostTimedOut, is(true)); + assumeThat("The connection attempt to " + BAD_HOST + " does not time out.", + badHostTimedOut, is(true)); run(); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java index 730f1787de..686746fb1c 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java @@ -34,6 +34,9 @@ import io.netty.channel.socket.oio.OioSocketChannel; import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory; import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.internal.SystemPropertyUtil; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -41,6 +44,15 @@ import java.util.List; public class SocketTestPermutation { + static final String BAD_HOST = SystemPropertyUtil.get("io.netty.testsuite.badHost", "netty.io"); + static final int BAD_PORT = SystemPropertyUtil.getInt("io.netty.testsuite.badPort", 65535); + + static { + InternalLogger logger = InternalLoggerFactory.getInstance(SocketConnectionAttemptTest.class); + logger.debug("-Dio.netty.testsuite.badHost: {}", BAD_HOST); + logger.debug("-Dio.netty.testsuite.badPort: {}", BAD_PORT); + } + static final SocketTestPermutation INSTANCE = new SocketTestPermutation(); protected static final int BOSSES = 2; @@ -168,4 +180,22 @@ public class SocketTestPermutation { } ); } + + public List> datagramSocket() { + return Arrays.asList( + new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(nioWorkerGroup).channel(NioDatagramChannel.class); + } + }, + new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(oioWorkerGroup).channel(OioDatagramChannel.class) + .option(ChannelOption.SO_TIMEOUT, OIO_SO_TIMEOUT); + } + } + ); + } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index b7498dcb3a..352a74bbb1 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -23,7 +23,11 @@ import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelPromise; +import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; @@ -32,19 +36,40 @@ import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.ThrowableUtil; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.AlreadyConnectedException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ConnectionPendingException; import java.nio.channels.NotYetConnectedException; import java.nio.channels.UnresolvedAddressException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr; import static io.netty.util.internal.ObjectUtil.checkNotNull; abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel { + private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( + new ClosedChannelException(), AbstractEpollChannel.class, "doClose()"); private static final ChannelMetadata METADATA = new ChannelMetadata(false); private final int readFlag; final LinuxSocket socket; + /** + * The future of the current connection attempt. If not null, subsequent + * connection attempts will fail. + */ + private ChannelPromise connectPromise; + private ScheduledFuture connectTimeoutFuture; + private SocketAddress requestedRemoteAddress; + + private volatile SocketAddress local; + private volatile SocketAddress remote; + protected int flags = Native.EPOLLET; boolean inputClosedSeenErrorOnRead; boolean epollInReadyRunnablePending; @@ -61,6 +86,24 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann readFlag = flag; flags |= flag; this.active = active; + if (active) { + // Directly cache the remote and local addresses + // See https://github.com/netty/netty/issues/2359 + local = fd.localAddress(); + remote = fd.remoteAddress(); + } + } + + AbstractEpollChannel(Channel parent, LinuxSocket fd, int flag, SocketAddress remote) { + super(parent); + socket = checkNotNull(fd, "fd"); + readFlag = flag; + flags |= flag; + active = true; + // Directly cache the remote and local addresses + // See https://github.com/netty/netty/issues/2359 + this.remote = remote; + local = fd.localAddress(); } static boolean isSoErrorZero(Socket fd) { @@ -114,6 +157,19 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // socket which has not even been connected yet. This has been observed to block during unit tests. inputClosedSeenErrorOnRead = true; try { + ChannelPromise promise = connectPromise; + if (promise != null) { + // Use tryFailure() instead of setFailure() to avoid the race against cancel(). + promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION); + connectPromise = null; + } + + ScheduledFuture future = connectTimeoutFuture; + if (future != null) { + future.cancel(false); + connectTimeoutFuture = null; + } + if (isRegistered()) { doDeregister(); } @@ -472,12 +528,14 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann /** * Called once a EPOLLOUT event is ready to be processed */ - void epollOutReady() { - if (socket.isOutputShutdown()) { - return; + final void epollOutReady() { + if (connectPromise != null) { + // pending connect which is now complete so handle it. + finishConnect(); + } else if (!socket.isOutputShutdown()) { + // directly call super.flush0() to force a flush now + super.flush0(); } - // directly call super.flush0() to force a flush now - super.flush0(); } protected final void clearEpollIn0() { @@ -492,5 +550,213 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann unsafe().close(unsafe().voidPromise()); } } + + @Override + public void connect( + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + if (!promise.setUncancellable() || !ensureOpen(promise)) { + return; + } + + try { + if (connectPromise != null) { + throw new ConnectionPendingException(); + } + + boolean wasActive = isActive(); + if (doConnect(remoteAddress, localAddress)) { + fulfillConnectPromise(promise, wasActive); + } else { + connectPromise = promise; + requestedRemoteAddress = remoteAddress; + + // Schedule connect timeout. + int connectTimeoutMillis = config().getConnectTimeoutMillis(); + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { + @Override + public void run() { + ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise; + ConnectTimeoutException cause = + new ConnectTimeoutException("connection timed out: " + remoteAddress); + if (connectPromise != null && connectPromise.tryFailure(cause)) { + close(voidPromise()); + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } + + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isCancelled()) { + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + close(voidPromise()); + } + } + }); + } + } catch (Throwable t) { + closeIfClosed(); + promise.tryFailure(annotateConnectException(t, remoteAddress)); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + return; + } + active = true; + + // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. + // We still need to ensure we call fireChannelActive() in this case. + boolean active = isActive(); + + // trySuccess() will return false if a user cancelled the connection attempt. + boolean promiseSet = promise.trySuccess(); + + // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, + // because what happened is what happened. + if (!wasActive && active) { + pipeline().fireChannelActive(); + } + + // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). + if (!promiseSet) { + close(voidPromise()); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + return; + } + + // Use tryFailure() instead of setFailure() to avoid the race against cancel(). + promise.tryFailure(cause); + closeIfClosed(); + } + + private void finishConnect() { + // Note this method is invoked by the event loop only if the connection attempt was + // neither cancelled nor timed out. + + assert eventLoop().inEventLoop(); + + boolean connectStillInProgress = false; + try { + boolean wasActive = isActive(); + if (!doFinishConnect()) { + connectStillInProgress = true; + return; + } + fulfillConnectPromise(connectPromise, wasActive); + } catch (Throwable t) { + fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); + } finally { + if (!connectStillInProgress) { + // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used + // See https://github.com/netty/netty/issues/1770 + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + } + } + } + + /** + * Finish the connect + */ + private boolean doFinishConnect() throws Exception { + if (socket.finishConnect()) { + clearFlag(Native.EPOLLOUT); + if (requestedRemoteAddress instanceof InetSocketAddress) { + remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress()); + } + requestedRemoteAddress = null; + + return true; + } + setFlag(Native.EPOLLOUT); + return false; + } + } + + @Override + protected void doBind(SocketAddress local) throws Exception { + if (local instanceof InetSocketAddress) { + checkResolvable((InetSocketAddress) local); + } + socket.bind(local); + this.local = socket.localAddress(); + } + + /** + * Connect to the remote peer + */ + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (localAddress instanceof InetSocketAddress) { + checkResolvable((InetSocketAddress) localAddress); + } + + InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress + ? (InetSocketAddress) remoteAddress : null; + if (remoteSocketAddr != null) { + checkResolvable(remoteSocketAddr); + } + + if (remote != null) { + // Check if already connected before trying to connect. This is needed as connect(...) will not return -1 + // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished + // later. + throw new AlreadyConnectedException(); + } + + if (localAddress != null) { + socket.bind(localAddress); + } + + boolean connected = doConnect0(remoteAddress); + if (connected) { + remote = remoteSocketAddr == null ? + remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress()); + } + // We always need to set the localAddress even if not connected yet as the bind already took place. + // + // See https://github.com/netty/netty/issues/3463 + local = socket.localAddress(); + return connected; + } + + private boolean doConnect0(SocketAddress remote) throws Exception { + boolean success = false; + try { + boolean connected = socket.connect(remote); + if (!connected) { + setFlag(Native.EPOLLOUT); + } + success = true; + return connected; + } finally { + if (!success) { + doClose(); + } + } + } + + @Override + protected SocketAddress localAddress0() { + return local; + } + + @Override + protected SocketAddress remoteAddress0() { + return remote; } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index 798878f1dd..ebda9ebfe7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -134,4 +134,9 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im } } } + + @Override + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + throw new UnsupportedOperationException(); + } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 2ca159846a..cc4ed88f44 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.ConnectTimeoutException; import io.netty.channel.DefaultFileRegion; import io.netty.channel.EventLoop; import io.netty.channel.FileRegion; @@ -45,12 +44,9 @@ import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.nio.channels.ConnectionPendingException; import java.nio.channels.WritableByteChannel; import java.util.Queue; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import static io.netty.channel.unix.FileDescriptor.pipe; import static io.netty.util.internal.ObjectUtil.checkNotNull; @@ -61,8 +57,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + StringUtil.simpleClassName(DefaultFileRegion.class) + ')'; private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class); - private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( - new ClosedChannelException(), AbstractEpollStreamChannel.class, "doClose()"); private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(new ClosedChannelException(), AbstractEpollStreamChannel.class, "clearSpliceQueue()"); @@ -73,13 +67,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im ThrowableUtil.unknownStackTrace(new ClosedChannelException(), AbstractEpollStreamChannel.class, "failSpliceIfClosed(...)"); - /** - * The future of the current connection attempt. If not null, subsequent - * connection attempts will fail. - */ - private ChannelPromise connectPromise; - private ScheduledFuture connectTimeoutFuture; - private SocketAddress requestedRemoteAddress; private Queue spliceQueue; // Lazy init these if we need to splice(...) @@ -106,6 +93,12 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im flags |= Native.EPOLLRDHUP; } + AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) { + super(parent, fd, Native.EPOLLIN, remote); + // Add EPOLLRDHUP so we are notified once the remote peer close the connection. + flags |= Native.EPOLLRDHUP; + } + protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) { super(null, fd, Native.EPOLLIN, active); // Add EPOLLRDHUP so we are notified once the remote peer close the connection. @@ -687,19 +680,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im @Override protected void doClose() throws Exception { try { - ChannelPromise promise = connectPromise; - if (promise != null) { - // Use tryFailure() instead of setFailure() to avoid the race against cancel(). - promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION); - connectPromise = null; - } - - ScheduledFuture future = connectTimeoutFuture; - if (future != null) { - future.cancel(false); - connectTimeoutFuture = null; - } - // Calling super.doClose() first so splceTo(...) will fail on next call. + // Calling super.doClose() first so spliceTo(...) will fail on next call. super.doClose(); } finally { safeClosePipe(pipeIn); @@ -721,29 +702,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } } - /** - * Connect to the remote peer - */ - protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { - if (localAddress != null) { - socket.bind(localAddress); - } - - boolean success = false; - try { - boolean connected = socket.connect(remoteAddress); - if (!connected) { - setFlag(Native.EPOLLOUT); - } - success = true; - return connected; - } finally { - if (!success) { - doClose(); - } - } - } - private static void safeClosePipe(FileDescriptor fd) { if (fd != null) { try { @@ -781,148 +739,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } } - @Override - public void connect( - final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { - if (!promise.setUncancellable() || !ensureOpen(promise)) { - return; - } - - try { - if (connectPromise != null) { - throw new ConnectionPendingException(); - } - - boolean wasActive = isActive(); - if (doConnect(remoteAddress, localAddress)) { - fulfillConnectPromise(promise, wasActive); - } else { - connectPromise = promise; - requestedRemoteAddress = remoteAddress; - - // Schedule connect timeout. - int connectTimeoutMillis = config().getConnectTimeoutMillis(); - if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(new Runnable() { - @Override - public void run() { - ChannelPromise connectPromise = AbstractEpollStreamChannel.this.connectPromise; - ConnectTimeoutException cause = - new ConnectTimeoutException("connection timed out: " + remoteAddress); - if (connectPromise != null && connectPromise.tryFailure(cause)) { - close(voidPromise()); - } - } - }, connectTimeoutMillis, TimeUnit.MILLISECONDS); - } - - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isCancelled()) { - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } - connectPromise = null; - close(voidPromise()); - } - } - }); - } - } catch (Throwable t) { - closeIfClosed(); - promise.tryFailure(annotateConnectException(t, remoteAddress)); - } - } - - private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { - if (promise == null) { - // Closed via cancellation and the promise has been notified already. - return; - } - active = true; - - // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. - // We still need to ensure we call fireChannelActive() in this case. - boolean active = isActive(); - - // trySuccess() will return false if a user cancelled the connection attempt. - boolean promiseSet = promise.trySuccess(); - - // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, - // because what happened is what happened. - if (!wasActive && active) { - pipeline().fireChannelActive(); - } - - // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). - if (!promiseSet) { - close(voidPromise()); - } - } - - private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { - if (promise == null) { - // Closed via cancellation and the promise has been notified already. - return; - } - - // Use tryFailure() instead of setFailure() to avoid the race against cancel(). - promise.tryFailure(cause); - closeIfClosed(); - } - - private void finishConnect() { - // Note this method is invoked by the event loop only if the connection attempt was - // neither cancelled nor timed out. - - assert eventLoop().inEventLoop(); - - boolean connectStillInProgress = false; - try { - boolean wasActive = isActive(); - if (!doFinishConnect()) { - connectStillInProgress = true; - return; - } - fulfillConnectPromise(connectPromise, wasActive); - } catch (Throwable t) { - fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); - } finally { - if (!connectStillInProgress) { - // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used - // See https://github.com/netty/netty/issues/1770 - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } - connectPromise = null; - } - } - } - - @Override - void epollOutReady() { - if (connectPromise != null) { - // pending connect which is now complete so handle it. - finishConnect(); - } else { - super.epollOutReady(); - } - } - - /** - * Finish the connect - */ - boolean doFinishConnect() throws Exception { - if (socket.finishConnect()) { - clearFlag(Native.EPOLLOUT); - return true; - } else { - setFlag(Native.EPOLLOUT); - return false; - } - } - @Override EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) { return new EpollRecvByteAllocatorStreamingHandle(handle); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 0f693bd4af..eca49d5fa1 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -30,7 +30,6 @@ import io.netty.channel.socket.DatagramPacket; import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.IovArray; import io.netty.channel.unix.UnixChannelUtil; -import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import java.io.IOException; @@ -40,7 +39,6 @@ import java.net.NetworkInterface; import java.net.SocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; -import java.nio.channels.NotYetConnectedException; import java.util.ArrayList; import java.util.List; @@ -59,10 +57,8 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements StringUtil.simpleClassName(InetSocketAddress.class) + ">, " + StringUtil.simpleClassName(ByteBuf.class) + ')'; - private volatile InetSocketAddress local; - private volatile InetSocketAddress remote; - private volatile boolean connected; private final EpollDatagramChannelConfig config; + private volatile boolean connected; public EpollDatagramChannel() { super(newSocketDgram(), Native.EPOLLIN); @@ -75,9 +71,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements EpollDatagramChannel(LinuxSocket fd) { super(null, fd, Native.EPOLLIN, true); - // As we create an EpollDatagramChannel from a FileDescriptor we should try to obtain the remote and local - // address from it. This is needed as the FileDescriptor may be bound already. - local = fd.localAddress(); config = new EpollDatagramChannelConfig(this); } @@ -117,8 +110,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements try { return joinGroup( multicastAddress, - NetworkInterface.getByInetAddress(localAddress().getAddress()), - null, promise); + NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise); } catch (SocketException e) { promise.setFailure(e); } @@ -261,22 +253,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements return new EpollDatagramChannelUnsafe(); } - @Override - protected InetSocketAddress localAddress0() { - return local; - } - - @Override - protected InetSocketAddress remoteAddress0() { - return remote; - } - @Override protected void doBind(SocketAddress localAddress) throws Exception { - InetSocketAddress addr = (InetSocketAddress) localAddress; - checkResolvable(addr); - socket.bind(addr); - local = socket.localAddress(); + super.doBind(localAddress); active = true; } @@ -360,30 +339,35 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements return true; } - if (remoteAddress == null) { - remoteAddress = remote; - if (remoteAddress == null) { - throw new NotYetConnectedException(); - } - } - - final int writtenBytes; + final long writtenBytes; if (data.hasMemoryAddress()) { long memoryAddress = data.memoryAddress(); - writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(), - remoteAddress.getAddress(), remoteAddress.getPort()); + if (remoteAddress == null) { + writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex()); + } else { + writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(), + remoteAddress.getAddress(), remoteAddress.getPort()); + } } else if (data.nioBufferCount() > 1) { IovArray array = ((EpollEventLoop) eventLoop()).cleanArray(); array.add(data); int cnt = array.count(); assert cnt != 0; - writtenBytes = socket.sendToAddresses(array.memoryAddress(0), - cnt, remoteAddress.getAddress(), remoteAddress.getPort()); + if (remoteAddress == null) { + writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt); + } else { + writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt, + remoteAddress.getAddress(), remoteAddress.getPort()); + } } else { ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes()); - writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(), - remoteAddress.getAddress(), remoteAddress.getPort()); + if (remoteAddress == null) { + writtenBytes = socket.write(nioData, nioData.position(), nioData.limit()); + } else { + writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(), + remoteAddress.getAddress(), remoteAddress.getPort()); + } } return writtenBytes > 0; @@ -427,48 +411,27 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override protected void doDisconnect() throws Exception { + socket.disconnect(); + connected = active = false; + } + + @Override + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (super.doConnect(remoteAddress, localAddress)) { + connected = true; + return true; + } + return false; + } + + @Override + protected void doClose() throws Exception { + super.doClose(); connected = false; } final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe { - @Override - public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) { - boolean success = false; - try { - try { - boolean wasActive = isActive(); - InetSocketAddress remoteAddress = (InetSocketAddress) remote; - if (local != null) { - InetSocketAddress localAddress = (InetSocketAddress) local; - doBind(localAddress); - } - - checkResolvable(remoteAddress); - EpollDatagramChannel.this.remote = remoteAddress; - EpollDatagramChannel.this.local = socket.localAddress(); - success = true; - - // First notify the promise before notifying the handler. - channelPromise.trySuccess(); - - // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, - // because what happened is what happened. - if (!wasActive && isActive()) { - pipeline().fireChannelActive(); - } - } finally { - if (!success) { - doClose(); - } else { - connected = true; - } - } - } catch (Throwable cause) { - channelPromise.tryFailure(cause); - } - } - @Override void epollInReady() { assert eventLoop().inEventLoop(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index f0f87919b1..5ee5a19622 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -37,7 +37,6 @@ import static io.netty.channel.unix.NativeInetAddress.address; public final class EpollServerSocketChannel extends AbstractEpollServerChannel implements ServerSocketChannel { private final EpollServerSocketChannelConfig config; - private volatile InetSocketAddress local; private volatile Collection tcpMd5SigAddresses = Collections.emptyList(); public EpollServerSocketChannel() { @@ -53,17 +52,11 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i EpollServerSocketChannel(LinuxSocket fd) { super(fd); - // As we create an EpollServerSocketChannel from a FileDescriptor we should try to obtain the remote and local - // address from it. This is needed as the FileDescriptor may be bound already. - local = fd.localAddress(); config = new EpollServerSocketChannelConfig(this); } EpollServerSocketChannel(LinuxSocket fd, boolean active) { super(fd, active); - // As we create an EpollServerSocketChannel from a FileDescriptor we should try to obtain the remote and local - // address from it. This is needed as the FileDescriptor may be bound already. - local = fd.localAddress(); config = new EpollServerSocketChannelConfig(this); } @@ -74,10 +67,7 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i @Override protected void doBind(SocketAddress localAddress) throws Exception { - InetSocketAddress addr = (InetSocketAddress) localAddress; - checkResolvable(addr); - socket.bind(addr); - local = socket.localAddress(); + super.doBind(localAddress); if (Native.IS_SUPPORTING_TCP_FASTOPEN && config.getTcpFastopen() > 0) { socket.setTcpFastOpen(config.getTcpFastopen()); } @@ -100,11 +90,6 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i return config; } - @Override - protected InetSocketAddress localAddress0() { - return local; - } - @Override protected Channel newChildChannel(int fd, byte[] address, int offset, int len) throws Exception { return new EpollSocketChannel(this, new LinuxSocket(fd), address(address, offset, len)); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 0d7befff6b..34559904e5 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -20,14 +20,10 @@ import io.netty.channel.ChannelException; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.internal.PlatformDependent; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.channels.AlreadyConnectedException; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -43,25 +39,8 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme private final EpollSocketChannelConfig config; - private volatile InetSocketAddress local; - private volatile InetSocketAddress remote; - private InetSocketAddress requestedRemote; - private volatile Collection tcpMd5SigAddresses = Collections.emptyList(); - EpollSocketChannel(Channel parent, LinuxSocket fd, InetSocketAddress remote) { - super(parent, fd); - config = new EpollSocketChannelConfig(this); - // Directly cache the remote and local addresses - // See https://github.com/netty/netty/issues/2359 - this.remote = remote; - local = fd.localAddress(); - - if (parent instanceof EpollServerSocketChannel) { - tcpMd5SigAddresses = ((EpollServerSocketChannel) parent).tcpMd5SigAddresses(); - } - } - public EpollSocketChannel() { super(newSocketStream(), false); config = new EpollSocketChannelConfig(this); @@ -69,22 +48,23 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme public EpollSocketChannel(int fd) { super(fd); - // As we create an EpollSocketChannel from a FileDescriptor we should try to obtain the remote and local - // address from it. This is needed as the FileDescriptor may be bound/connected already. - remote = socket.remoteAddress(); - local = socket.localAddress(); config = new EpollSocketChannelConfig(this); } EpollSocketChannel(LinuxSocket fd, boolean active) { super(fd, active); - // As we create an EpollSocketChannel from a FileDescriptor we should try to obtain the remote and local - // address from it. This is needed as the FileDescriptor may be bound/connected already. - remote = fd.remoteAddress(); - local = fd.localAddress(); config = new EpollSocketChannelConfig(this); } + EpollSocketChannel(Channel parent, LinuxSocket fd, InetSocketAddress remoteAddress) { + super(parent, fd, remoteAddress); + config = new EpollSocketChannelConfig(this); + + if (parent instanceof EpollServerSocketChannel) { + tcpMd5SigAddresses = ((EpollServerSocketChannel) parent).tcpMd5SigAddresses(); + } + } + /** * Returns the {@code TCP_INFO} for the current socket. See man 7 tcp. */ @@ -115,23 +95,6 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme return (InetSocketAddress) super.localAddress(); } - @Override - protected SocketAddress localAddress0() { - return local; - } - - @Override - protected SocketAddress remoteAddress0() { - return remote; - } - - @Override - protected void doBind(SocketAddress local) throws Exception { - InetSocketAddress localAddress = (InetSocketAddress) local; - socket.bind(localAddress); - this.local = socket.localAddress(); - } - @Override public EpollSocketChannelConfig config() { return config; @@ -147,54 +110,6 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme return new EpollSocketChannelUnsafe(); } - private static InetSocketAddress computeRemoteAddr(InetSocketAddress remoteAddr, InetSocketAddress osRemoteAddr) { - if (osRemoteAddr != null) { - if (PlatformDependent.javaVersion() >= 7) { - try { - // Only try to construct a new InetSocketAddress if we using java >= 7 as getHostString() does not - // exists in earlier releases and so the retrieval of the hostname could block the EventLoop if a - // reverse lookup would be needed. - return new InetSocketAddress(InetAddress.getByAddress(remoteAddr.getHostString(), - osRemoteAddr.getAddress().getAddress()), - osRemoteAddr.getPort()); - } catch (UnknownHostException ignore) { - // Should never happen but fallback to osRemoteAddr anyway. - } - } - return osRemoteAddr; - } - return remoteAddr; - } - - @Override - protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { - if (localAddress != null) { - checkResolvable((InetSocketAddress) localAddress); - } - InetSocketAddress remoteAddr = (InetSocketAddress) remoteAddress; - checkResolvable(remoteAddr); - - if (remote != null) { - // Check if already connected before trying to connect. This is needed as connect(...) will not return -1 - // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished - // later. - throw new AlreadyConnectedException(); - } - - boolean connected = super.doConnect(remoteAddress, localAddress); - if (connected) { - remote = computeRemoteAddr(remoteAddr, socket.remoteAddress()); - } else { - // Store for later usage in doFinishConnect() - requestedRemote = remoteAddr; - } - // We always need to set the localAddress even if not connected yet as the bind already took place. - // - // See https://github.com/netty/netty/issues/3463 - local = socket.localAddress(); - return connected; - } - private final class EpollSocketChannelUnsafe extends EpollStreamUnsafe { @Override protected Executor prepareToClose() { @@ -216,19 +131,9 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme } return null; } - - @Override - boolean doFinishConnect() throws Exception { - if (super.doFinishConnect()) { - remote = computeRemoteAddr(requestedRemote, socket.remoteAddress()); - requestedRemote = null; - return true; - } - return false; - } } void setTcpMd5Sig(Map keys) throws IOException { - this.tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys); + tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramConnectNotExistsTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramConnectNotExistsTest.java new file mode 100644 index 0000000000..86042c1b51 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramConnectNotExistsTest.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.DatagramConnectNotExistsTest; + +import java.util.List; + +public class EpollDatagramConnectNotExistsTest extends DatagramConnectNotExistsTest { + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.datagramSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index 2da13e16ae..225c1d2a41 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -179,6 +179,18 @@ class EpollSocketTestPermutation extends SocketTestPermutation { ); } + @Override + public List> datagramSocket() { + return Collections.>singletonList( + new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDatagramChannel.class); + } + } + ); + } + public boolean isServerFastOpen() { return AccessController.doPrivileged(new PrivilegedAction() { @Override diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index a26bb906fa..0e490b9235 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -23,7 +23,11 @@ import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelPromise; +import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; @@ -34,14 +38,28 @@ import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.AlreadyConnectedException; +import java.nio.channels.ConnectionPendingException; import java.nio.channels.NotYetConnectedException; import java.nio.channels.UnresolvedAddressException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr; import static io.netty.util.internal.ObjectUtil.checkNotNull; abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); + /** + * The future of the current connection attempt. If not null, subsequent + * connection attempts will fail. + */ + private ChannelPromise connectPromise; + private ScheduledFuture connectTimeoutFuture; + private SocketAddress requestedRemoteAddress; + final BsdSocket socket; private boolean readFilterEnabled = true; private boolean writeFilterEnabled; @@ -57,6 +75,8 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan long jniSelfPtr; protected volatile boolean active; + private volatile SocketAddress local; + private volatile SocketAddress remote; AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) { this(parent, fd, active, false); @@ -67,6 +87,22 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan socket = checkNotNull(fd, "fd"); this.active = active; this.writeFilterEnabled = writeFilterEnabled; + if (active) { + // Directly cache the remote and local addresses + // See https://github.com/netty/netty/issues/2359 + local = fd.localAddress(); + remote = fd.remoteAddress(); + } + } + + AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) { + super(parent); + socket = checkNotNull(fd, "fd"); + active = true; + // Directly cache the remote and local addresses + // See https://github.com/netty/netty/issues/2359 + this.remote = remote; + local = fd.localAddress(); } static boolean isSoErrorZero(BsdSocket fd) { @@ -393,12 +429,14 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan } } - void writeReady() { - if (socket.isOutputShutdown()) { - return; + final void writeReady() { + if (connectPromise != null) { + // pending connect which is now complete so handle it. + finishConnect(); + } else if (!socket.isOutputShutdown()) { + // directly call super.flush0() to force a flush now + super.flush0(); } - // directly call super.flush0() to force a flush now - super.flush0(); } /** @@ -478,5 +516,209 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan pipeline().fireUserEventTriggered(evt); close(voidPromise()); } + + @Override + public void connect( + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + if (!promise.setUncancellable() || !ensureOpen(promise)) { + return; + } + + try { + if (connectPromise != null) { + throw new ConnectionPendingException(); + } + + boolean wasActive = isActive(); + if (doConnect(remoteAddress, localAddress)) { + fulfillConnectPromise(promise, wasActive); + } else { + connectPromise = promise; + requestedRemoteAddress = remoteAddress; + + // Schedule connect timeout. + int connectTimeoutMillis = config().getConnectTimeoutMillis(); + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { + @Override + public void run() { + ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise; + ConnectTimeoutException cause = + new ConnectTimeoutException("connection timed out: " + remoteAddress); + if (connectPromise != null && connectPromise.tryFailure(cause)) { + close(voidPromise()); + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } + + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isCancelled()) { + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + close(voidPromise()); + } + } + }); + } + } catch (Throwable t) { + closeIfClosed(); + promise.tryFailure(annotateConnectException(t, remoteAddress)); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + return; + } + active = true; + + // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. + // We still need to ensure we call fireChannelActive() in this case. + boolean active = isActive(); + + // trySuccess() will return false if a user cancelled the connection attempt. + boolean promiseSet = promise.trySuccess(); + + // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, + // because what happened is what happened. + if (!wasActive && active) { + pipeline().fireChannelActive(); + } + + // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). + if (!promiseSet) { + close(voidPromise()); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + return; + } + + // Use tryFailure() instead of setFailure() to avoid the race against cancel(). + promise.tryFailure(cause); + closeIfClosed(); + } + + private void finishConnect() { + // Note this method is invoked by the event loop only if the connection attempt was + // neither cancelled nor timed out. + + assert eventLoop().inEventLoop(); + + boolean connectStillInProgress = false; + try { + boolean wasActive = isActive(); + if (!doFinishConnect()) { + connectStillInProgress = true; + return; + } + fulfillConnectPromise(connectPromise, wasActive); + } catch (Throwable t) { + fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); + } finally { + if (!connectStillInProgress) { + // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used + // See https://github.com/netty/netty/issues/1770 + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + } + } + } + + private boolean doFinishConnect() throws Exception { + if (socket.finishConnect()) { + writeFilter(false); + if (requestedRemoteAddress instanceof InetSocketAddress) { + remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress()); + } + requestedRemoteAddress = null; + return true; + } + writeFilter(true); + return false; + } + } + + @Override + protected void doBind(SocketAddress local) throws Exception { + if (local instanceof InetSocketAddress) { + checkResolvable((InetSocketAddress) local); + } + socket.bind(local); + this.local = socket.localAddress(); + } + + /** + * Connect to the remote peer + */ + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (localAddress instanceof InetSocketAddress) { + checkResolvable((InetSocketAddress) localAddress); + } + + InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress + ? (InetSocketAddress) remoteAddress : null; + if (remoteSocketAddr != null) { + checkResolvable(remoteSocketAddr); + } + + if (remote != null) { + // Check if already connected before trying to connect. This is needed as connect(...) will not return -1 + // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished + // later. + throw new AlreadyConnectedException(); + } + + if (localAddress != null) { + socket.bind(localAddress); + } + + boolean connected = doConnect0(remoteAddress); + if (connected) { + remote = remoteSocketAddr == null ? + remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress()); + } + // We always need to set the localAddress even if not connected yet as the bind already took place. + // + // See https://github.com/netty/netty/issues/3463 + local = socket.localAddress(); + return connected; + } + + private boolean doConnect0(SocketAddress remote) throws Exception { + boolean success = false; + try { + boolean connected = socket.connect(remote); + if (!connected) { + writeFilter(true); + } + success = true; + return connected; + } finally { + if (!success) { + doClose(); + } + } + } + + @Override + protected SocketAddress localAddress0() { + return local; + } + + @Override + protected SocketAddress remoteAddress0() { + return remote; } } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java index 397a96068b..47dad9c394 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java @@ -72,18 +72,17 @@ public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception; + @Override + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + throw new UnsupportedOperationException(); + } + final class KQueueServerSocketUnsafe extends AbstractKQueueUnsafe { // Will hold the remote address after accept(...) was successful. // We need 24 bytes for the address as maximum + 1 byte for storing the capacity. // So use 26 bytes as it's a power of two. private final byte[] acceptedAddress = new byte[26]; - @Override - public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) { - // Connect not supported by ServerChannel implementations - channelPromise.setFailure(new UnsupportedOperationException()); - } - @Override void readReady(KQueueRecvByteAllocatorHandle allocHandle) { assert eventLoop().inEventLoop(); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java index db8bb55fc5..b3a302cd82 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java @@ -20,12 +20,10 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.ConnectTimeoutException; import io.netty.channel.DefaultFileRegion; import io.netty.channel.EventLoop; import io.netty.channel.FileRegion; @@ -35,37 +33,31 @@ import io.netty.channel.unix.SocketWritableByteChannel; import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; -import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.UnstableApi; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ConnectionPendingException; import java.nio.channels.WritableByteChannel; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; @UnstableApi public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); - private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( - new ClosedChannelException(), AbstractKQueueStreamChannel.class, "doClose()"); private static final String EXPECTED_TYPES = " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + StringUtil.simpleClassName(DefaultFileRegion.class) + ')'; - private ChannelPromise connectPromise; - private ScheduledFuture connectTimeoutFuture; - private SocketAddress requestedRemoteAddress; private WritableByteChannel byteChannel; AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) { super(parent, fd, active, true); } + AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) { + super(parent, fd, remote); + } + AbstractKQueueStreamChannel(BsdSocket fd) { this(null, fd, isSoErrorZero(fd)); } @@ -513,47 +505,6 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel return promise; } - @Override - protected void doClose() throws Exception { - ChannelPromise promise = connectPromise; - if (promise != null) { - // Use tryFailure() instead of setFailure() to avoid the race against cancel(). - promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION); - connectPromise = null; - } - - ScheduledFuture future = connectTimeoutFuture; - if (future != null) { - future.cancel(false); - connectTimeoutFuture = null; - } - // Calling super.doClose() first so splceTo(...) will fail on next call. - super.doClose(); - } - - /** - * Connect to the remote peer - */ - protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { - if (localAddress != null) { - socket.bind(localAddress); - } - - boolean success = false; - try { - boolean connected = socket.connect(remoteAddress); - if (!connected) { - writeFilter(true); - } - success = true; - return connected; - } finally { - if (!success) { - doClose(); - } - } - } - class KQueueStreamUnsafe extends AbstractKQueueUnsafe { // Overridden here just to be able to access this method from AbstractKQueueStreamChannel @Override @@ -622,145 +573,6 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel } } - @Override - void writeReady() { - if (connectPromise != null) { - // pending connect which is now complete so handle it. - finishConnect(); - } else { - super.writeReady(); - } - } - - @Override - public void connect( - final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { - if (!promise.setUncancellable() || !ensureOpen(promise)) { - return; - } - - try { - if (connectPromise != null) { - throw new ConnectionPendingException(); - } - - boolean wasActive = isActive(); - if (doConnect(remoteAddress, localAddress)) { - fulfillConnectPromise(promise, wasActive); - } else { - connectPromise = promise; - requestedRemoteAddress = remoteAddress; - - // Schedule connect timeout. - int connectTimeoutMillis = config().getConnectTimeoutMillis(); - if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(new Runnable() { - @Override - public void run() { - ChannelPromise connectPromise = AbstractKQueueStreamChannel.this.connectPromise; - ConnectTimeoutException cause = - new ConnectTimeoutException("connection timed out: " + remoteAddress); - if (connectPromise != null && connectPromise.tryFailure(cause)) { - close(voidPromise()); - } - } - }, connectTimeoutMillis, TimeUnit.MILLISECONDS); - } - - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isCancelled()) { - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } - connectPromise = null; - close(voidPromise()); - } - } - }); - } - } catch (Throwable t) { - closeIfClosed(); - promise.tryFailure(annotateConnectException(t, remoteAddress)); - } - } - - private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { - if (promise == null) { - // Closed via cancellation and the promise has been notified already. - return; - } - active = true; - - // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. - // We still need to ensure we call fireChannelActive() in this case. - boolean active = isActive(); - - // trySuccess() will return false if a user cancelled the connection attempt. - boolean promiseSet = promise.trySuccess(); - - // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, - // because what happened is what happened. - if (!wasActive && active) { - pipeline().fireChannelActive(); - } - - // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). - if (!promiseSet) { - close(voidPromise()); - } - } - - private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { - if (promise == null) { - // Closed via cancellation and the promise has been notified already. - return; - } - - // Use tryFailure() instead of setFailure() to avoid the race against cancel(). - promise.tryFailure(cause); - closeIfClosed(); - } - - private void finishConnect() { - // Note this method is invoked by the event loop only if the connection attempt was - // neither cancelled nor timed out. - - assert eventLoop().inEventLoop(); - - boolean connectStillInProgress = false; - try { - boolean wasActive = isActive(); - if (!doFinishConnect()) { - connectStillInProgress = true; - return; - } - fulfillConnectPromise(connectPromise, wasActive); - } catch (Throwable t) { - fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); - } finally { - if (!connectStillInProgress) { - // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used - // See https://github.com/netty/netty/issues/1770 - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } - connectPromise = null; - } - } - } - - boolean doFinishConnect() throws Exception { - if (socket.finishConnect()) { - writeFilter(false); - return true; - } else { - writeFilter(true); - return false; - } - } - private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close, KQueueRecvByteAllocatorHandle allocHandle) { if (byteBuf != null) { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java index 90c5f86a98..887951a28f 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java @@ -30,7 +30,6 @@ import io.netty.channel.socket.DatagramPacket; import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.IovArray; import io.netty.channel.unix.UnixChannelUtil; -import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; @@ -41,7 +40,6 @@ import java.net.NetworkInterface; import java.net.SocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; -import java.nio.channels.NotYetConnectedException; import java.util.ArrayList; import java.util.List; @@ -57,8 +55,6 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement StringUtil.simpleClassName(InetSocketAddress.class) + ">, " + StringUtil.simpleClassName(ByteBuf.class) + ')'; - private volatile InetSocketAddress local; - private volatile InetSocketAddress remote; private volatile boolean connected; private final KQueueDatagramChannelConfig config; @@ -74,9 +70,6 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement KQueueDatagramChannel(BsdSocket socket, boolean active) { super(null, socket, active); config = new KQueueDatagramChannelConfig(this); - // As we create an EpollDatagramChannel from a FileDescriptor we should try to obtain the remote and local - // address from it. This is needed as the FileDescriptor may be bound already. - local = socket.localAddress(); } @Override @@ -259,22 +252,9 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement return new KQueueDatagramChannelUnsafe(); } - @Override - protected InetSocketAddress localAddress0() { - return local; - } - - @Override - protected InetSocketAddress remoteAddress0() { - return remote; - } - @Override protected void doBind(SocketAddress localAddress) throws Exception { - InetSocketAddress addr = (InetSocketAddress) localAddress; - checkResolvable(addr); - socket.bind(addr); - local = socket.localAddress(); + super.doBind(localAddress); active = true; } @@ -332,30 +312,35 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement return true; } - if (remoteAddress == null) { - remoteAddress = remote; - if (remoteAddress == null) { - throw new NotYetConnectedException(); - } - } - - final int writtenBytes; + final long writtenBytes; if (data.hasMemoryAddress()) { long memoryAddress = data.memoryAddress(); - writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(), - remoteAddress.getAddress(), remoteAddress.getPort()); + if (remoteAddress == null) { + writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex()); + } else { + writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(), + remoteAddress.getAddress(), remoteAddress.getPort()); + } } else if (data.nioBufferCount() > 1) { IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray(); array.add(data); int cnt = array.count(); assert cnt != 0; - writtenBytes = socket.sendToAddresses(array.memoryAddress(0), - cnt, remoteAddress.getAddress(), remoteAddress.getPort()); + if (remoteAddress == null) { + writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt); + } else { + writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt, + remoteAddress.getAddress(), remoteAddress.getPort()); + } } else { ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes()); - writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(), - remoteAddress.getAddress(), remoteAddress.getPort()); + if (remoteAddress == null) { + writtenBytes = socket.write(nioData, nioData.position(), nioData.limit()); + } else { + writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(), + remoteAddress.getAddress(), remoteAddress.getPort()); + } } return writtenBytes > 0; @@ -399,48 +384,27 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement @Override protected void doDisconnect() throws Exception { + socket.disconnect(); + connected = active = false; + } + + @Override + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (super.doConnect(remoteAddress, localAddress)) { + connected = true; + return true; + } + return false; + } + + @Override + protected void doClose() throws Exception { + super.doClose(); connected = false; } final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe { - @Override - public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) { - boolean success = false; - try { - try { - boolean wasActive = isActive(); - InetSocketAddress remoteAddress = (InetSocketAddress) remote; - if (local != null) { - InetSocketAddress localAddress = (InetSocketAddress) local; - doBind(localAddress); - } - - checkResolvable(remoteAddress); - KQueueDatagramChannel.this.remote = remoteAddress; - KQueueDatagramChannel.this.local = socket.localAddress(); - success = true; - - // First notify the promise before notifying the handler. - channelPromise.trySuccess(); - - // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, - // because what happened is what happened. - if (!wasActive && isActive()) { - pipeline().fireChannelActive(); - } - } finally { - if (!success) { - doClose(); - } else { - connected = true; - } - } - } catch (Throwable cause) { - channelPromise.tryFailure(cause); - } - } - @Override void readReady(KQueueRecvByteAllocatorHandle allocHandle) { assert eventLoop().inEventLoop(); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueServerSocketChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueServerSocketChannel.java index ae37067f30..56dd9c135f 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueServerSocketChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueServerSocketChannel.java @@ -29,7 +29,6 @@ import static io.netty.channel.unix.NativeInetAddress.address; @UnstableApi public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel implements ServerSocketChannel { private final KQueueServerSocketChannelConfig config; - private volatile InetSocketAddress local; public KQueueServerSocketChannel() { super(newSocketStream(), false); @@ -45,18 +44,11 @@ public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel KQueueServerSocketChannel(BsdSocket fd) { super(fd); config = new KQueueServerSocketChannelConfig(this); - - // As we create an KQueueServerSocketChannel from a FileDescriptor we should try to obtain the remote and local - // address from it. This is needed as the FileDescriptor may be bound already. - local = fd.localAddress(); } KQueueServerSocketChannel(BsdSocket fd, boolean active) { super(fd, active); config = new KQueueServerSocketChannelConfig(this); - // As we create an KQueueServerSocketChannel from a FileDescriptor we should try to obtain the remote and local - // address from it. This is needed as the FileDescriptor may be bound already. - local = fd.localAddress(); } @Override @@ -66,10 +58,8 @@ public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel @Override protected void doBind(SocketAddress localAddress) throws Exception { - InetSocketAddress addr = (InetSocketAddress) localAddress; - checkResolvable(addr); - socket.bind(addr); - local = socket.localAddress(); + super.doBind(localAddress); + // TODO(scott): tcp fast open here! socket.listen(config.getBacklog()); active = true; @@ -90,11 +80,6 @@ public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel return config; } - @Override - protected InetSocketAddress localAddress0() { - return local; - } - @Override protected Channel newChildChannel(int fd, byte[] address, int offset, int len) throws Exception { return new KQueueSocketChannel(this, new BsdSocket(fd), address(address, offset, len)); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java index 89688e2945..ecccbe8164 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java @@ -19,24 +19,15 @@ import io.netty.channel.Channel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.UnstableApi; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.channels.AlreadyConnectedException; import java.util.concurrent.Executor; @UnstableApi public final class KQueueSocketChannel extends AbstractKQueueStreamChannel implements SocketChannel { private final KQueueSocketChannelConfig config; - private volatile InetSocketAddress local; - private volatile InetSocketAddress remote; - private InetSocketAddress requestedRemote; - public KQueueSocketChannel() { super(null, BsdSocket.newSocketStream(), false); config = new KQueueSocketChannelConfig(this); @@ -44,20 +35,12 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple public KQueueSocketChannel(int fd) { super(new BsdSocket(fd)); - // As we create an EpollSocketChannel from a FileDescriptor we should try to obtain the remote and local - // address from it. This is needed as the FileDescriptor may be bound/connected already. - remote = socket.remoteAddress(); - local = socket.localAddress(); config = new KQueueSocketChannelConfig(this); } - KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remote) { - super(parent, fd, true); + KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remoteAddress) { + super(parent, fd, remoteAddress); config = new KQueueSocketChannelConfig(this); - // Directly cache the remote and local addresses - // See https://github.com/netty/netty/issues/2359 - this.remote = remote; - local = fd.localAddress(); } @Override @@ -70,23 +53,6 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple return (InetSocketAddress) super.localAddress(); } - @Override - protected SocketAddress localAddress0() { - return local; - } - - @Override - protected SocketAddress remoteAddress0() { - return remote; - } - - @Override - protected void doBind(SocketAddress local) throws Exception { - InetSocketAddress localAddress = (InetSocketAddress) local; - socket.bind(localAddress); - this.local = socket.localAddress(); - } - @Override public KQueueSocketChannelConfig config() { return config; @@ -102,54 +68,6 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple return new KQueueSocketChannelUnsafe(); } - private static InetSocketAddress computeRemoteAddr(InetSocketAddress remoteAddr, InetSocketAddress osRemoteAddr) { - if (osRemoteAddr != null) { - if (PlatformDependent.javaVersion() >= 7) { - try { - // Only try to construct a new InetSocketAddress if we using java >= 7 as getHostString() does not - // exists in earlier releases and so the retrieval of the hostname could block the EventLoop if a - // reverse lookup would be needed. - return new InetSocketAddress(InetAddress.getByAddress(remoteAddr.getHostString(), - osRemoteAddr.getAddress().getAddress()), - osRemoteAddr.getPort()); - } catch (UnknownHostException ignore) { - // Should never happen but fallback to osRemoteAddr anyway. - } - } - return osRemoteAddr; - } - return remoteAddr; - } - - @Override - protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { - if (localAddress != null) { - checkResolvable((InetSocketAddress) localAddress); - } - InetSocketAddress remoteAddr = (InetSocketAddress) remoteAddress; - checkResolvable(remoteAddr); - - if (remote != null) { - // Check if already connected before trying to connect. This is needed as connect(...) will not return -1 - // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished - // later. - throw new AlreadyConnectedException(); - } - - boolean connected = super.doConnect(remoteAddress, localAddress); - if (connected) { - remote = computeRemoteAddr(remoteAddr, socket.remoteAddress()); - } else { - // Store for later usage in doFinishConnect() - requestedRemote = remoteAddr; - } - // We always need to set the localAddress even if not connected yet as the bind already took place. - // - // See https://github.com/netty/netty/issues/3463 - local = socket.localAddress(); - return connected; - } - private final class KQueueSocketChannelUnsafe extends KQueueStreamUnsafe { @Override protected Executor prepareToClose() { @@ -171,15 +89,5 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple } return null; } - - @Override - boolean doFinishConnect() throws Exception { - if (super.doFinishConnect()) { - remote = computeRemoteAddr(requestedRemote, socket.remoteAddress()); - requestedRemote = null; - return true; - } - return false; - } } } diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDatagramConnectNotExistsTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDatagramConnectNotExistsTest.java new file mode 100644 index 0000000000..560d048b15 --- /dev/null +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDatagramConnectNotExistsTest.java @@ -0,0 +1,30 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.kqueue; + +import io.netty.bootstrap.Bootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.DatagramConnectNotExistsTest; + +import java.util.List; + +public class KQueueDatagramConnectNotExistsTest extends DatagramConnectNotExistsTest { + + @Override + protected List> newFactories() { + return KQueueSocketTestPermutation.INSTANCE.datagramSocket(); + } +} diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java index a4f945decd..ab2bef9cf9 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java @@ -25,7 +25,6 @@ import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.unix.DomainSocketAddress; -import io.netty.channel.unix.Socket; import io.netty.channel.unix.tests.UnixTestUtils; import io.netty.testsuite.transport.TestsuitePermutation; import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; @@ -34,8 +33,6 @@ import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -166,6 +163,17 @@ class KQueueSocketTestPermutation extends SocketTestPermutation { ); } + @Override + public List> datagramSocket() { + return Collections.>singletonList( + new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueDatagramChannel.class); + } + } + ); + } public static DomainSocketAddress newSocketAddress() { return UnixTestUtils.newSocketAddress(); } diff --git a/transport-native-unix-common/src/main/c/netty_unix_errors.c b/transport-native-unix-common/src/main/c/netty_unix_errors.c index 6a92604de2..5103ceefc1 100644 --- a/transport-native-unix-common/src/main/c/netty_unix_errors.c +++ b/transport-native-unix-common/src/main/c/netty_unix_errors.c @@ -24,6 +24,7 @@ static jclass runtimeExceptionClass = NULL; static jclass channelExceptionClass = NULL; static jclass ioExceptionClass = NULL; +static jclass portUnreachableExceptionClass = NULL; static jclass closedChannelExceptionClass = NULL; static jmethodID closedChannelExceptionMethodId = NULL; @@ -56,6 +57,10 @@ void netty_unix_errors_throwIOException(JNIEnv* env, char* message) { (*env)->ThrowNew(env, ioExceptionClass, message); } +void netty_unix_errors_throwPortUnreachableException(JNIEnv* env, char* message) { + (*env)->ThrowNew(env, portUnreachableExceptionClass, message); +} + void netty_unix_errors_throwIOExceptionErrorNo(JNIEnv* env, char* message, int errorNumber) { char* allocatedMessage = exceptionMessage(message, errorNumber); (*env)->ThrowNew(env, ioExceptionClass, allocatedMessage); @@ -207,6 +212,18 @@ jint netty_unix_errors_JNI_OnLoad(JNIEnv* env, const char* packagePrefix) { return JNI_ERR; } + jclass localPortUnreachableExceptionClass = (*env)->FindClass(env, "java/net/PortUnreachableException"); + if (localPortUnreachableExceptionClass == NULL) { + // pending exception... + return JNI_ERR; + } + portUnreachableExceptionClass = (jclass) (*env)->NewGlobalRef(env, localPortUnreachableExceptionClass); + if (portUnreachableExceptionClass == NULL) { + // out-of-memory! + netty_unix_errors_throwOutOfMemoryError(env); + return JNI_ERR; + } + return NETTY_JNI_VERSION; } @@ -224,6 +241,10 @@ void netty_unix_errors_JNI_OnUnLoad(JNIEnv* env) { (*env)->DeleteGlobalRef(env, ioExceptionClass); ioExceptionClass = NULL; } + if (portUnreachableExceptionClass != NULL) { + (*env)->DeleteGlobalRef(env, portUnreachableExceptionClass); + portUnreachableExceptionClass = NULL; + } if (closedChannelExceptionClass != NULL) { (*env)->DeleteGlobalRef(env, closedChannelExceptionClass); closedChannelExceptionClass = NULL; diff --git a/transport-native-unix-common/src/main/c/netty_unix_errors.h b/transport-native-unix-common/src/main/c/netty_unix_errors.h index c623956f3b..e8092b50bb 100644 --- a/transport-native-unix-common/src/main/c/netty_unix_errors.h +++ b/transport-native-unix-common/src/main/c/netty_unix_errors.h @@ -23,6 +23,7 @@ void netty_unix_errors_throwRuntimeExceptionErrorNo(JNIEnv* env, char* message, void netty_unix_errors_throwChannelExceptionErrorNo(JNIEnv* env, char* message, int errorNumber); void netty_unix_errors_throwIOException(JNIEnv* env, char* message); void netty_unix_errors_throwIOExceptionErrorNo(JNIEnv* env, char* message, int errorNumber); +void netty_unix_errors_throwPortUnreachableException(JNIEnv* env, char* message); void netty_unix_errors_throwClosedChannelException(JNIEnv* env); void netty_unix_errors_throwOutOfMemoryError(JNIEnv* env); diff --git a/transport-native-unix-common/src/main/c/netty_unix_socket.c b/transport-native-unix-common/src/main/c/netty_unix_socket.c index 96879352ff..7b98abace9 100644 --- a/transport-native-unix-common/src/main/c/netty_unix_socket.c +++ b/transport-native-unix-common/src/main/c/netty_unix_socket.c @@ -305,6 +305,10 @@ static jobject _recvFrom(JNIEnv* env, jint fd, void* buffer, jint pos, jint limi netty_unix_errors_throwClosedChannelException(env); return NULL; } + if (err == ECONNREFUSED) { + netty_unix_errors_throwPortUnreachableException(env, "recvfrom() failed"); + return NULL; + } netty_unix_errors_throwIOExceptionErrorNo(env, "recvfrom() failed: ", err); return NULL; } @@ -412,6 +416,38 @@ static jint netty_unix_socket_finishConnect(JNIEnv* env, jclass clazz, jint fd) return -optval; } +static jint netty_unix_socket_disconnect(JNIEnv* env, jclass clazz, jint fd) { + struct sockaddr_storage addr; + int len; + + memset(&addr, 0, sizeof(addr)); + + // You can disconnect connection-less sockets by using AF_UNSPEC. + // See man 2 connect. + if (socketType == AF_INET6) { + struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) &addr; + ip6addr->sin6_family = AF_UNSPEC; + len = sizeof(struct sockaddr_in6); + } else { + struct sockaddr_in* ipaddr = (struct sockaddr_in*) &addr; + ipaddr->sin_family = AF_UNSPEC; + len = sizeof(struct sockaddr_in); + } + + int res; + int err; + do { + res = connect(fd, (struct sockaddr*) &addr, len); + } while (res == -1 && ((err = errno) == EINTR)); + + // EAFNOSUPPORT is harmless in this case. + // See http://www.unix.com/man-page/osx/2/connect/ + if (res < 0 && err != EAFNOSUPPORT) { + return -err; + } + return 0; +} + static jint netty_unix_socket_accept(JNIEnv* env, jclass clazz, jint fd, jbyteArray acceptedAddress) { jint socketFd; jsize len; @@ -822,6 +858,7 @@ static const JNINativeMethod fixed_method_table[] = { { "listen", "(II)I", (void *) netty_unix_socket_listen }, { "connect", "(I[BII)I", (void *) netty_unix_socket_connect }, { "finishConnect", "(I)I", (void *) netty_unix_socket_finishConnect }, + { "disconnect", "(I)I", (void *) netty_unix_socket_disconnect}, { "accept", "(I[B)I", (void *) netty_unix_socket_accept }, { "remoteAddress", "(I)[B", (void *) netty_unix_socket_remoteAddress }, { "localAddress", "(I)[B", (void *) netty_unix_socket_localAddress }, diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java index a5739b564a..e9e7932873 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java @@ -22,11 +22,13 @@ import java.io.IOException; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.PortUnreachableException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE; +import static io.netty.channel.unix.Errors.ERROR_ECONNREFUSED_NEGATIVE; import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE; import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE; import static io.netty.channel.unix.Errors.ioResult; @@ -141,6 +143,9 @@ public class Socket extends FileDescriptor { if (res >= 0) { return res; } + if (res == ERROR_ECONNREFUSED_NEGATIVE) { + throw new PortUnreachableException("sendTo failed"); + } return ioResult("sendTo", res, SEND_TO_CONNECTION_RESET_EXCEPTION, SEND_TO_CLOSED_CHANNEL_EXCEPTION); } @@ -162,6 +167,9 @@ public class Socket extends FileDescriptor { if (res >= 0) { return res; } + if (res == ERROR_ECONNREFUSED_NEGATIVE) { + throw new PortUnreachableException("sendToAddress failed"); + } return ioResult("sendToAddress", res, SEND_TO_ADDRESS_CONNECTION_RESET_EXCEPTION, SEND_TO_ADDRESS_CLOSED_CHANNEL_EXCEPTION); } @@ -183,6 +191,10 @@ public class Socket extends FileDescriptor { if (res >= 0) { return res; } + + if (res == ERROR_ECONNREFUSED_NEGATIVE) { + throw new PortUnreachableException("sendToAddresses failed"); + } return ioResult("sendToAddresses", res, CONNECTION_RESET_EXCEPTION_SENDMSG, SEND_TO_ADDRESSES_CLOSED_CHANNEL_EXCEPTION); } @@ -257,6 +269,13 @@ public class Socket extends FileDescriptor { return true; } + public final void disconnect() throws IOException { + int res = disconnect(fd); + if (res < 0) { + throwConnectException("disconnect", FINISH_CONNECT_REFUSED_EXCEPTION, res); + } + } + public final void bind(SocketAddress socketAddress) throws IOException { if (socketAddress instanceof InetSocketAddress) { InetSocketAddress addr = (InetSocketAddress) socketAddress; @@ -432,6 +451,7 @@ public class Socket extends FileDescriptor { private static native int connect(int fd, byte[] address, int scopeId, int port); private static native int connectDomainSocket(int fd, byte[] path); private static native int finishConnect(int fd); + private static native int disconnect(int fd); private static native int bind(int fd, byte[] address, int scopeId, int port); private static native int bindDomainSocket(int fd, byte[] path); private static native int listen(int fd, int backlog); diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/UnixChannelUtil.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/UnixChannelUtil.java index f201b86933..4d7bb0d29c 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/UnixChannelUtil.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/UnixChannelUtil.java @@ -16,6 +16,11 @@ package io.netty.channel.unix; import io.netty.buffer.ByteBuf; +import io.netty.util.internal.PlatformDependent; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import static io.netty.channel.unix.Limits.IOV_MAX; @@ -35,4 +40,23 @@ public final class UnixChannelUtil { static boolean isBufferCopyNeededForWrite(ByteBuf byteBuf, int iovMax) { return !byteBuf.hasMemoryAddress() && (!byteBuf.isDirect() || byteBuf.nioBufferCount() > iovMax); } + + public static InetSocketAddress computeRemoteAddr(InetSocketAddress remoteAddr, InetSocketAddress osRemoteAddr) { + if (osRemoteAddr != null) { + if (PlatformDependent.javaVersion() >= 7) { + try { + // Only try to construct a new InetSocketAddress if we using java >= 7 as getHostString() does not + // exists in earlier releases and so the retrieval of the hostname could block the EventLoop if a + // reverse lookup would be needed. + return new InetSocketAddress(InetAddress.getByAddress(remoteAddr.getHostString(), + osRemoteAddr.getAddress().getAddress()), + osRemoteAddr.getPort()); + } catch (UnknownHostException ignore) { + // Should never happen but fallback to osRemoteAddr anyway. + } + } + return osRemoteAddr; + } + return remoteAddr; + } }