Correctly handle connect/disconnect in EpollDatagramChannel / KQueueDatagramChannel

Motivation:

We did not correctly handle connect() and disconnect() in EpollDatagramChannel / KQueueDatagramChannel and so the behavior was different compared to NioDatagramChannel.

Modifications:

- Correct implement connect and disconnect methods
- Share connect and related code
- Add tests

Result:

EpollDatagramChannel / KQueueDatagramChannel also supports correctly connect() and disconnect() methods.
This commit is contained in:
Norman Maurer 2017-01-09 14:15:48 +01:00
parent 4448b8f42f
commit 4bb89dcc54
25 changed files with 1021 additions and 817 deletions

View File

@ -0,0 +1,69 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.oio.OioDatagramChannel;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.Assert;
import org.junit.Test;
import java.net.PortUnreachableException;
import java.util.List;
public class DatagramConnectNotExistsTest extends AbstractClientSocketTest {
@Override
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
return SocketTestPermutation.INSTANCE.datagramSocket();
}
@Test(timeout = 10000)
public void testConnectNotExists() throws Throwable {
run();
}
public void testConnectNotExists(Bootstrap cb) throws Throwable {
final Promise<Throwable> promise = ImmediateEventExecutor.INSTANCE.newPromise();
cb.handler(new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
promise.trySuccess(cause);
}
});
ChannelFuture future = cb.connect(NetUtil.LOCALHOST, SocketTestPermutation.BAD_PORT);
try {
Channel datagramChannel = future.syncUninterruptibly().channel();
Assert.assertTrue(datagramChannel.isActive());
datagramChannel.writeAndFlush(
Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII)).syncUninterruptibly();
if (!(datagramChannel instanceof OioDatagramChannel)) {
Assert.assertTrue(promise.syncUninterruptibly().getNow() instanceof PortUnreachableException);
}
} finally {
future.channel().close();
}
}
}

View File

@ -20,14 +20,17 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.nio.channels.NotYetConnectedException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -129,29 +132,20 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
assertTrue(buf.release());
}
@Test
public void testSimpleSendWithConnect() throws Throwable {
run();
}
public void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb) throws Throwable {
testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 1);
testSimpleSendWithConnect(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), BYTES, 4);
}
@SuppressWarnings("deprecation")
private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
final byte[] bytes, int count, WrapType wrapType)
throws Throwable {
final CountDownLatch latch = new CountDownLatch(count);
sb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
ByteBuf buf = msg.content();
assertEquals(bytes.length, buf.readableBytes());
for (byte b: bytes) {
assertEquals(b, buf.readByte());
}
latch.countDown();
}
});
}
});
cb.handler(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception {
@ -159,7 +153,9 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
}
});
Channel sc = sb.bind(newSocketAddress()).sync().channel();
final CountDownLatch latch = new CountDownLatch(count);
Channel sc = setupServerChannel(sb, bytes, latch);
Channel cc;
if (bindClient) {
cc = cb.bind(newSocketAddress()).sync().channel();
@ -167,15 +163,14 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
cc = cb.register().sync().channel();
}
InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
for (int i = 0; i < count; i++) {
switch (wrapType) {
case DUP:
cc.write(new DatagramPacket(buf.retain().duplicate(), addr));
cc.write(new DatagramPacket(buf.retainedDuplicate(), addr));
break;
case SLICE:
cc.write(new DatagramPacket(buf.retain().slice(), addr));
cc.write(new DatagramPacket(buf.retainedSlice(), addr));
break;
case READ_ONLY:
cc.write(new DatagramPacket(buf.retain().asReadOnly(), addr));
@ -195,4 +190,90 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
sc.close().sync();
cc.close().sync();
}
private void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count)
throws Throwable {
for (WrapType type: WrapType.values()) {
testSimpleSendWithConnect0(sb, cb, buf.retain(), bytes, count, type);
}
assertTrue(buf.release());
}
private void testSimpleSendWithConnect0(Bootstrap sb, Bootstrap cb, ByteBuf buf, final byte[] bytes, int count,
WrapType wrapType) throws Throwable {
cb.handler(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception {
// Nothing will be sent.
}
});
final CountDownLatch latch = new CountDownLatch(count);
Channel sc = setupServerChannel(sb, bytes, latch);
DatagramChannel cc = null;
try {
cc = (DatagramChannel) cb.connect(sc.localAddress()).sync().channel();
for (int i = 0; i < count; i++) {
switch (wrapType) {
case DUP:
cc.write(buf.retainedDuplicate());
break;
case SLICE:
cc.write(buf.retainedSlice());
break;
case READ_ONLY:
cc.write(buf.retain().asReadOnly());
break;
case NONE:
cc.write(buf.retain());
break;
default:
throw new Error("unknown wrap type: " + wrapType);
}
}
cc.flush();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(cc.isConnected());
// Test what happens when we call disconnect()
cc.disconnect().syncUninterruptibly();
assertFalse(cc.isConnected());
ChannelFuture future = cc.writeAndFlush(
buf.retain().duplicate()).awaitUninterruptibly();
assertTrue(future.cause() instanceof NotYetConnectedException);
} finally {
// release as we used buf.retain() before
buf.release();
sc.close().sync();
if (cc != null) {
cc.close().sync();
}
}
}
@SuppressWarnings("deprecation")
private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final CountDownLatch latch)
throws Throwable {
sb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
ByteBuf buf = msg.content();
assertEquals(bytes.length, buf.readableBytes());
for (byte b : bytes) {
assertEquals(b, buf.readByte());
}
latch.countDown();
}
});
}
});
return sb.bind(newSocketAddress()).sync().channel();
}
}

View File

@ -16,7 +16,6 @@
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@ -26,8 +25,6 @@ import io.netty.util.internal.SocketUtils;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.junit.Test;
@ -38,21 +35,14 @@ import java.net.Socket;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import static io.netty.testsuite.transport.socket.SocketTestPermutation.BAD_HOST;
import static io.netty.testsuite.transport.socket.SocketTestPermutation.BAD_PORT;
public class SocketConnectionAttemptTest extends AbstractClientSocketTest {
private static final String BAD_HOST = SystemPropertyUtil.get("io.netty.testsuite.badHost", "netty.io");
private static final int BAD_PORT = SystemPropertyUtil.getInt("io.netty.testsuite.badPort", 65535);
// See /etc/services
private static final int UNASSIGNED_PORT = 4;
static {
InternalLogger logger = InternalLoggerFactory.getInstance(SocketConnectionAttemptTest.class);
logger.debug("-Dio.netty.testsuite.badHost: {}", BAD_HOST);
logger.debug("-Dio.netty.testsuite.badPort: {}", BAD_PORT);
}
@Test(timeout = 30000)
public void testConnectTimeout() throws Throwable {
run();
@ -123,7 +113,8 @@ public class SocketConnectionAttemptTest extends AbstractClientSocketTest {
}
}
assumeThat("The connection attempt to " + BAD_HOST + " does not time out.", badHostTimedOut, is(true));
assumeThat("The connection attempt to " + BAD_HOST + " does not time out.",
badHostTimedOut, is(true));
run();
}

View File

@ -34,6 +34,9 @@ import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@ -41,6 +44,15 @@ import java.util.List;
public class SocketTestPermutation {
static final String BAD_HOST = SystemPropertyUtil.get("io.netty.testsuite.badHost", "netty.io");
static final int BAD_PORT = SystemPropertyUtil.getInt("io.netty.testsuite.badPort", 65535);
static {
InternalLogger logger = InternalLoggerFactory.getInstance(SocketConnectionAttemptTest.class);
logger.debug("-Dio.netty.testsuite.badHost: {}", BAD_HOST);
logger.debug("-Dio.netty.testsuite.badPort: {}", BAD_PORT);
}
static final SocketTestPermutation INSTANCE = new SocketTestPermutation();
protected static final int BOSSES = 2;
@ -168,4 +180,22 @@ public class SocketTestPermutation {
}
);
}
public List<BootstrapFactory<Bootstrap>> datagramSocket() {
return Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channel(NioDatagramChannel.class);
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(oioWorkerGroup).channel(OioDatagramChannel.class)
.option(ChannelOption.SO_TIMEOUT, OIO_SO_TIMEOUT);
}
}
);
}
}

View File

@ -23,7 +23,11 @@ import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
@ -32,19 +36,40 @@ import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
import io.netty.channel.unix.UnixChannel;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.ThrowableUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractEpollChannel.class, "doClose()");
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private final int readFlag;
final LinuxSocket socket;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
private volatile SocketAddress local;
private volatile SocketAddress remote;
protected int flags = Native.EPOLLET;
boolean inputClosedSeenErrorOnRead;
boolean epollInReadyRunnablePending;
@ -61,6 +86,24 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
readFlag = flag;
flags |= flag;
this.active = active;
if (active) {
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
local = fd.localAddress();
remote = fd.remoteAddress();
}
}
AbstractEpollChannel(Channel parent, LinuxSocket fd, int flag, SocketAddress remote) {
super(parent);
socket = checkNotNull(fd, "fd");
readFlag = flag;
flags |= flag;
active = true;
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
this.remote = remote;
local = fd.localAddress();
}
static boolean isSoErrorZero(Socket fd) {
@ -114,6 +157,19 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// socket which has not even been connected yet. This has been observed to block during unit tests.
inputClosedSeenErrorOnRead = true;
try {
ChannelPromise promise = connectPromise;
if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectPromise = null;
}
ScheduledFuture<?> future = connectTimeoutFuture;
if (future != null) {
future.cancel(false);
connectTimeoutFuture = null;
}
if (isRegistered()) {
doDeregister();
}
@ -472,12 +528,14 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
/**
* Called once a EPOLLOUT event is ready to be processed
*/
void epollOutReady() {
if (socket.isOutputShutdown()) {
return;
final void epollOutReady() {
if (connectPromise != null) {
// pending connect which is now complete so handle it.
finishConnect();
} else if (!socket.isOutputShutdown()) {
// directly call super.flush0() to force a flush now
super.flush0();
}
// directly call super.flush0() to force a flush now
super.flush0();
}
protected final void clearEpollIn0() {
@ -492,5 +550,213 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
unsafe().close(unsafe().voidPromise());
}
}
@Override
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
closeIfClosed();
promise.tryFailure(annotateConnectException(t, remoteAddress));
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
active = true;
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(cause);
closeIfClosed();
}
private void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
boolean connectStillInProgress = false;
try {
boolean wasActive = isActive();
if (!doFinishConnect()) {
connectStillInProgress = true;
return;
}
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
if (!connectStillInProgress) {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
}
/**
* Finish the connect
*/
private boolean doFinishConnect() throws Exception {
if (socket.finishConnect()) {
clearFlag(Native.EPOLLOUT);
if (requestedRemoteAddress instanceof InetSocketAddress) {
remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
}
requestedRemoteAddress = null;
return true;
}
setFlag(Native.EPOLLOUT);
return false;
}
}
@Override
protected void doBind(SocketAddress local) throws Exception {
if (local instanceof InetSocketAddress) {
checkResolvable((InetSocketAddress) local);
}
socket.bind(local);
this.local = socket.localAddress();
}
/**
* Connect to the remote peer
*/
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress instanceof InetSocketAddress) {
checkResolvable((InetSocketAddress) localAddress);
}
InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
? (InetSocketAddress) remoteAddress : null;
if (remoteSocketAddr != null) {
checkResolvable(remoteSocketAddr);
}
if (remote != null) {
// Check if already connected before trying to connect. This is needed as connect(...) will not return -1
// and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
// later.
throw new AlreadyConnectedException();
}
if (localAddress != null) {
socket.bind(localAddress);
}
boolean connected = doConnect0(remoteAddress);
if (connected) {
remote = remoteSocketAddr == null ?
remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
}
// We always need to set the localAddress even if not connected yet as the bind already took place.
//
// See https://github.com/netty/netty/issues/3463
local = socket.localAddress();
return connected;
}
private boolean doConnect0(SocketAddress remote) throws Exception {
boolean success = false;
try {
boolean connected = socket.connect(remote);
if (!connected) {
setFlag(Native.EPOLLOUT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
@Override
protected SocketAddress localAddress0() {
return local;
}
@Override
protected SocketAddress remoteAddress0() {
return remote;
}
}

View File

@ -134,4 +134,9 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
}
}
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
}

View File

@ -25,7 +25,6 @@ import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
@ -45,12 +44,9 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.WritableByteChannel;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static io.netty.channel.unix.FileDescriptor.pipe;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
@ -61,8 +57,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractEpollStreamChannel.class, "doClose()");
private static final ClosedChannelException CLEAR_SPLICE_QUEUE_CLOSED_CHANNEL_EXCEPTION =
ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
AbstractEpollStreamChannel.class, "clearSpliceQueue()");
@ -73,13 +67,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
ThrowableUtil.unknownStackTrace(new ClosedChannelException(),
AbstractEpollStreamChannel.class, "failSpliceIfClosed(...)");
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
private Queue<SpliceInTask> spliceQueue;
// Lazy init these if we need to splice(...)
@ -106,6 +93,12 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
flags |= Native.EPOLLRDHUP;
}
AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
super(parent, fd, Native.EPOLLIN, remote);
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
flags |= Native.EPOLLRDHUP;
}
protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
super(null, fd, Native.EPOLLIN, active);
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
@ -687,19 +680,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
@Override
protected void doClose() throws Exception {
try {
ChannelPromise promise = connectPromise;
if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectPromise = null;
}
ScheduledFuture<?> future = connectTimeoutFuture;
if (future != null) {
future.cancel(false);
connectTimeoutFuture = null;
}
// Calling super.doClose() first so splceTo(...) will fail on next call.
// Calling super.doClose() first so spliceTo(...) will fail on next call.
super.doClose();
} finally {
safeClosePipe(pipeIn);
@ -721,29 +702,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
}
}
/**
* Connect to the remote peer
*/
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
socket.bind(localAddress);
}
boolean success = false;
try {
boolean connected = socket.connect(remoteAddress);
if (!connected) {
setFlag(Native.EPOLLOUT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
private static void safeClosePipe(FileDescriptor fd) {
if (fd != null) {
try {
@ -781,148 +739,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
}
}
@Override
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractEpollStreamChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
closeIfClosed();
promise.tryFailure(annotateConnectException(t, remoteAddress));
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
active = true;
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(cause);
closeIfClosed();
}
private void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
boolean connectStillInProgress = false;
try {
boolean wasActive = isActive();
if (!doFinishConnect()) {
connectStillInProgress = true;
return;
}
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
if (!connectStillInProgress) {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
}
@Override
void epollOutReady() {
if (connectPromise != null) {
// pending connect which is now complete so handle it.
finishConnect();
} else {
super.epollOutReady();
}
}
/**
* Finish the connect
*/
boolean doFinishConnect() throws Exception {
if (socket.finishConnect()) {
clearFlag(Native.EPOLLOUT);
return true;
} else {
setFlag(Native.EPOLLOUT);
return false;
}
}
@Override
EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
return new EpollRecvByteAllocatorStreamingHandle(handle);

View File

@ -30,7 +30,6 @@ import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
@ -40,7 +39,6 @@ import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.NotYetConnectedException;
import java.util.ArrayList;
import java.util.List;
@ -59,10 +57,8 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private volatile InetSocketAddress local;
private volatile InetSocketAddress remote;
private volatile boolean connected;
private final EpollDatagramChannelConfig config;
private volatile boolean connected;
public EpollDatagramChannel() {
super(newSocketDgram(), Native.EPOLLIN);
@ -75,9 +71,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
EpollDatagramChannel(LinuxSocket fd) {
super(null, fd, Native.EPOLLIN, true);
// As we create an EpollDatagramChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound already.
local = fd.localAddress();
config = new EpollDatagramChannelConfig(this);
}
@ -117,8 +110,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
try {
return joinGroup(
multicastAddress,
NetworkInterface.getByInetAddress(localAddress().getAddress()),
null, promise);
NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
} catch (SocketException e) {
promise.setFailure(e);
}
@ -261,22 +253,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
return new EpollDatagramChannelUnsafe();
}
@Override
protected InetSocketAddress localAddress0() {
return local;
}
@Override
protected InetSocketAddress remoteAddress0() {
return remote;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
InetSocketAddress addr = (InetSocketAddress) localAddress;
checkResolvable(addr);
socket.bind(addr);
local = socket.localAddress();
super.doBind(localAddress);
active = true;
}
@ -360,30 +339,35 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
return true;
}
if (remoteAddress == null) {
remoteAddress = remote;
if (remoteAddress == null) {
throw new NotYetConnectedException();
}
}
final int writtenBytes;
final long writtenBytes;
if (data.hasMemoryAddress()) {
long memoryAddress = data.memoryAddress();
writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort());
if (remoteAddress == null) {
writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
} else {
writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort());
}
} else if (data.nioBufferCount() > 1) {
IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
array.add(data);
int cnt = array.count();
assert cnt != 0;
writtenBytes = socket.sendToAddresses(array.memoryAddress(0),
cnt, remoteAddress.getAddress(), remoteAddress.getPort());
if (remoteAddress == null) {
writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
} else {
writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
remoteAddress.getAddress(), remoteAddress.getPort());
}
} else {
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
remoteAddress.getAddress(), remoteAddress.getPort());
if (remoteAddress == null) {
writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
} else {
writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
remoteAddress.getAddress(), remoteAddress.getPort());
}
}
return writtenBytes > 0;
@ -427,48 +411,27 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override
protected void doDisconnect() throws Exception {
socket.disconnect();
connected = active = false;
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (super.doConnect(remoteAddress, localAddress)) {
connected = true;
return true;
}
return false;
}
@Override
protected void doClose() throws Exception {
super.doClose();
connected = false;
}
final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
@Override
public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
boolean success = false;
try {
try {
boolean wasActive = isActive();
InetSocketAddress remoteAddress = (InetSocketAddress) remote;
if (local != null) {
InetSocketAddress localAddress = (InetSocketAddress) local;
doBind(localAddress);
}
checkResolvable(remoteAddress);
EpollDatagramChannel.this.remote = remoteAddress;
EpollDatagramChannel.this.local = socket.localAddress();
success = true;
// First notify the promise before notifying the handler.
channelPromise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} finally {
if (!success) {
doClose();
} else {
connected = true;
}
}
} catch (Throwable cause) {
channelPromise.tryFailure(cause);
}
}
@Override
void epollInReady() {
assert eventLoop().inEventLoop();

View File

@ -37,7 +37,6 @@ import static io.netty.channel.unix.NativeInetAddress.address;
public final class EpollServerSocketChannel extends AbstractEpollServerChannel implements ServerSocketChannel {
private final EpollServerSocketChannelConfig config;
private volatile InetSocketAddress local;
private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
public EpollServerSocketChannel() {
@ -53,17 +52,11 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i
EpollServerSocketChannel(LinuxSocket fd) {
super(fd);
// As we create an EpollServerSocketChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound already.
local = fd.localAddress();
config = new EpollServerSocketChannelConfig(this);
}
EpollServerSocketChannel(LinuxSocket fd, boolean active) {
super(fd, active);
// As we create an EpollServerSocketChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound already.
local = fd.localAddress();
config = new EpollServerSocketChannelConfig(this);
}
@ -74,10 +67,7 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
InetSocketAddress addr = (InetSocketAddress) localAddress;
checkResolvable(addr);
socket.bind(addr);
local = socket.localAddress();
super.doBind(localAddress);
if (Native.IS_SUPPORTING_TCP_FASTOPEN && config.getTcpFastopen() > 0) {
socket.setTcpFastOpen(config.getTcpFastopen());
}
@ -100,11 +90,6 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i
return config;
}
@Override
protected InetSocketAddress localAddress0() {
return local;
}
@Override
protected Channel newChildChannel(int fd, byte[] address, int offset, int len) throws Exception {
return new EpollSocketChannel(this, new LinuxSocket(fd), address(address, offset, len));

View File

@ -20,14 +20,10 @@ import io.netty.channel.ChannelException;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.AlreadyConnectedException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@ -43,25 +39,8 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
private final EpollSocketChannelConfig config;
private volatile InetSocketAddress local;
private volatile InetSocketAddress remote;
private InetSocketAddress requestedRemote;
private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
EpollSocketChannel(Channel parent, LinuxSocket fd, InetSocketAddress remote) {
super(parent, fd);
config = new EpollSocketChannelConfig(this);
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
this.remote = remote;
local = fd.localAddress();
if (parent instanceof EpollServerSocketChannel) {
tcpMd5SigAddresses = ((EpollServerSocketChannel) parent).tcpMd5SigAddresses();
}
}
public EpollSocketChannel() {
super(newSocketStream(), false);
config = new EpollSocketChannelConfig(this);
@ -69,22 +48,23 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
public EpollSocketChannel(int fd) {
super(fd);
// As we create an EpollSocketChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound/connected already.
remote = socket.remoteAddress();
local = socket.localAddress();
config = new EpollSocketChannelConfig(this);
}
EpollSocketChannel(LinuxSocket fd, boolean active) {
super(fd, active);
// As we create an EpollSocketChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound/connected already.
remote = fd.remoteAddress();
local = fd.localAddress();
config = new EpollSocketChannelConfig(this);
}
EpollSocketChannel(Channel parent, LinuxSocket fd, InetSocketAddress remoteAddress) {
super(parent, fd, remoteAddress);
config = new EpollSocketChannelConfig(this);
if (parent instanceof EpollServerSocketChannel) {
tcpMd5SigAddresses = ((EpollServerSocketChannel) parent).tcpMd5SigAddresses();
}
}
/**
* Returns the {@code TCP_INFO} for the current socket. See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
*/
@ -115,23 +95,6 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
return (InetSocketAddress) super.localAddress();
}
@Override
protected SocketAddress localAddress0() {
return local;
}
@Override
protected SocketAddress remoteAddress0() {
return remote;
}
@Override
protected void doBind(SocketAddress local) throws Exception {
InetSocketAddress localAddress = (InetSocketAddress) local;
socket.bind(localAddress);
this.local = socket.localAddress();
}
@Override
public EpollSocketChannelConfig config() {
return config;
@ -147,54 +110,6 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
return new EpollSocketChannelUnsafe();
}
private static InetSocketAddress computeRemoteAddr(InetSocketAddress remoteAddr, InetSocketAddress osRemoteAddr) {
if (osRemoteAddr != null) {
if (PlatformDependent.javaVersion() >= 7) {
try {
// Only try to construct a new InetSocketAddress if we using java >= 7 as getHostString() does not
// exists in earlier releases and so the retrieval of the hostname could block the EventLoop if a
// reverse lookup would be needed.
return new InetSocketAddress(InetAddress.getByAddress(remoteAddr.getHostString(),
osRemoteAddr.getAddress().getAddress()),
osRemoteAddr.getPort());
} catch (UnknownHostException ignore) {
// Should never happen but fallback to osRemoteAddr anyway.
}
}
return osRemoteAddr;
}
return remoteAddr;
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
checkResolvable((InetSocketAddress) localAddress);
}
InetSocketAddress remoteAddr = (InetSocketAddress) remoteAddress;
checkResolvable(remoteAddr);
if (remote != null) {
// Check if already connected before trying to connect. This is needed as connect(...) will not return -1
// and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
// later.
throw new AlreadyConnectedException();
}
boolean connected = super.doConnect(remoteAddress, localAddress);
if (connected) {
remote = computeRemoteAddr(remoteAddr, socket.remoteAddress());
} else {
// Store for later usage in doFinishConnect()
requestedRemote = remoteAddr;
}
// We always need to set the localAddress even if not connected yet as the bind already took place.
//
// See https://github.com/netty/netty/issues/3463
local = socket.localAddress();
return connected;
}
private final class EpollSocketChannelUnsafe extends EpollStreamUnsafe {
@Override
protected Executor prepareToClose() {
@ -216,19 +131,9 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
}
return null;
}
@Override
boolean doFinishConnect() throws Exception {
if (super.doFinishConnect()) {
remote = computeRemoteAddr(requestedRemote, socket.remoteAddress());
requestedRemote = null;
return true;
}
return false;
}
}
void setTcpMd5Sig(Map<InetAddress, byte[]> keys) throws IOException {
this.tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys);
tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys);
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.DatagramConnectNotExistsTest;
import java.util.List;
public class EpollDatagramConnectNotExistsTest extends DatagramConnectNotExistsTest {
@Override
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagramSocket();
}
}

View File

@ -179,6 +179,18 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
);
}
@Override
public List<BootstrapFactory<Bootstrap>> datagramSocket() {
return Collections.<BootstrapFactory<Bootstrap>>singletonList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDatagramChannel.class);
}
}
);
}
public boolean isServerFastOpen() {
return AccessController.doPrivileged(new PrivilegedAction<Integer>() {
@Override

View File

@ -23,7 +23,11 @@ import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
@ -34,14 +38,28 @@ import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
final BsdSocket socket;
private boolean readFilterEnabled = true;
private boolean writeFilterEnabled;
@ -57,6 +75,8 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
long jniSelfPtr;
protected volatile boolean active;
private volatile SocketAddress local;
private volatile SocketAddress remote;
AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
this(parent, fd, active, false);
@ -67,6 +87,22 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
socket = checkNotNull(fd, "fd");
this.active = active;
this.writeFilterEnabled = writeFilterEnabled;
if (active) {
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
local = fd.localAddress();
remote = fd.remoteAddress();
}
}
AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
super(parent);
socket = checkNotNull(fd, "fd");
active = true;
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
this.remote = remote;
local = fd.localAddress();
}
static boolean isSoErrorZero(BsdSocket fd) {
@ -393,12 +429,14 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
}
}
void writeReady() {
if (socket.isOutputShutdown()) {
return;
final void writeReady() {
if (connectPromise != null) {
// pending connect which is now complete so handle it.
finishConnect();
} else if (!socket.isOutputShutdown()) {
// directly call super.flush0() to force a flush now
super.flush0();
}
// directly call super.flush0() to force a flush now
super.flush0();
}
/**
@ -478,5 +516,209 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
pipeline().fireUserEventTriggered(evt);
close(voidPromise());
}
@Override
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
closeIfClosed();
promise.tryFailure(annotateConnectException(t, remoteAddress));
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
active = true;
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(cause);
closeIfClosed();
}
private void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
boolean connectStillInProgress = false;
try {
boolean wasActive = isActive();
if (!doFinishConnect()) {
connectStillInProgress = true;
return;
}
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
if (!connectStillInProgress) {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
}
private boolean doFinishConnect() throws Exception {
if (socket.finishConnect()) {
writeFilter(false);
if (requestedRemoteAddress instanceof InetSocketAddress) {
remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
}
requestedRemoteAddress = null;
return true;
}
writeFilter(true);
return false;
}
}
@Override
protected void doBind(SocketAddress local) throws Exception {
if (local instanceof InetSocketAddress) {
checkResolvable((InetSocketAddress) local);
}
socket.bind(local);
this.local = socket.localAddress();
}
/**
* Connect to the remote peer
*/
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress instanceof InetSocketAddress) {
checkResolvable((InetSocketAddress) localAddress);
}
InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
? (InetSocketAddress) remoteAddress : null;
if (remoteSocketAddr != null) {
checkResolvable(remoteSocketAddr);
}
if (remote != null) {
// Check if already connected before trying to connect. This is needed as connect(...) will not return -1
// and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
// later.
throw new AlreadyConnectedException();
}
if (localAddress != null) {
socket.bind(localAddress);
}
boolean connected = doConnect0(remoteAddress);
if (connected) {
remote = remoteSocketAddr == null ?
remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
}
// We always need to set the localAddress even if not connected yet as the bind already took place.
//
// See https://github.com/netty/netty/issues/3463
local = socket.localAddress();
return connected;
}
private boolean doConnect0(SocketAddress remote) throws Exception {
boolean success = false;
try {
boolean connected = socket.connect(remote);
if (!connected) {
writeFilter(true);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
@Override
protected SocketAddress localAddress0() {
return local;
}
@Override
protected SocketAddress remoteAddress0() {
return remote;
}
}

View File

@ -72,18 +72,17 @@ public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel
abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
final class KQueueServerSocketUnsafe extends AbstractKQueueUnsafe {
// Will hold the remote address after accept(...) was successful.
// We need 24 bytes for the address as maximum + 1 byte for storing the capacity.
// So use 26 bytes as it's a power of two.
private final byte[] acceptedAddress = new byte[26];
@Override
public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
// Connect not supported by ServerChannel implementations
channelPromise.setFailure(new UnsupportedOperationException());
}
@Override
void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
assert eventLoop().inEventLoop();

View File

@ -20,12 +20,10 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
@ -35,37 +33,31 @@ import io.netty.channel.unix.SocketWritableByteChannel;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.UnstableApi;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@UnstableApi
public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractKQueueStreamChannel.class, "doClose()");
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
private WritableByteChannel byteChannel;
AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) {
super(parent, fd, active, true);
}
AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
super(parent, fd, remote);
}
AbstractKQueueStreamChannel(BsdSocket fd) {
this(null, fd, isSoErrorZero(fd));
}
@ -513,47 +505,6 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
return promise;
}
@Override
protected void doClose() throws Exception {
ChannelPromise promise = connectPromise;
if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectPromise = null;
}
ScheduledFuture<?> future = connectTimeoutFuture;
if (future != null) {
future.cancel(false);
connectTimeoutFuture = null;
}
// Calling super.doClose() first so splceTo(...) will fail on next call.
super.doClose();
}
/**
* Connect to the remote peer
*/
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
socket.bind(localAddress);
}
boolean success = false;
try {
boolean connected = socket.connect(remoteAddress);
if (!connected) {
writeFilter(true);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
class KQueueStreamUnsafe extends AbstractKQueueUnsafe {
// Overridden here just to be able to access this method from AbstractKQueueStreamChannel
@Override
@ -622,145 +573,6 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
}
}
@Override
void writeReady() {
if (connectPromise != null) {
// pending connect which is now complete so handle it.
finishConnect();
} else {
super.writeReady();
}
}
@Override
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractKQueueStreamChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
closeIfClosed();
promise.tryFailure(annotateConnectException(t, remoteAddress));
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
active = true;
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(cause);
closeIfClosed();
}
private void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
boolean connectStillInProgress = false;
try {
boolean wasActive = isActive();
if (!doFinishConnect()) {
connectStillInProgress = true;
return;
}
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
if (!connectStillInProgress) {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
}
boolean doFinishConnect() throws Exception {
if (socket.finishConnect()) {
writeFilter(false);
return true;
} else {
writeFilter(true);
return false;
}
}
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
KQueueRecvByteAllocatorHandle allocHandle) {
if (byteBuf != null) {

View File

@ -30,7 +30,6 @@ import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi;
@ -41,7 +40,6 @@ import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.NotYetConnectedException;
import java.util.ArrayList;
import java.util.List;
@ -57,8 +55,6 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
StringUtil.simpleClassName(ByteBuf.class) + ')';
private volatile InetSocketAddress local;
private volatile InetSocketAddress remote;
private volatile boolean connected;
private final KQueueDatagramChannelConfig config;
@ -74,9 +70,6 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
KQueueDatagramChannel(BsdSocket socket, boolean active) {
super(null, socket, active);
config = new KQueueDatagramChannelConfig(this);
// As we create an EpollDatagramChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound already.
local = socket.localAddress();
}
@Override
@ -259,22 +252,9 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
return new KQueueDatagramChannelUnsafe();
}
@Override
protected InetSocketAddress localAddress0() {
return local;
}
@Override
protected InetSocketAddress remoteAddress0() {
return remote;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
InetSocketAddress addr = (InetSocketAddress) localAddress;
checkResolvable(addr);
socket.bind(addr);
local = socket.localAddress();
super.doBind(localAddress);
active = true;
}
@ -332,30 +312,35 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
return true;
}
if (remoteAddress == null) {
remoteAddress = remote;
if (remoteAddress == null) {
throw new NotYetConnectedException();
}
}
final int writtenBytes;
final long writtenBytes;
if (data.hasMemoryAddress()) {
long memoryAddress = data.memoryAddress();
writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort());
if (remoteAddress == null) {
writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
} else {
writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort());
}
} else if (data.nioBufferCount() > 1) {
IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
array.add(data);
int cnt = array.count();
assert cnt != 0;
writtenBytes = socket.sendToAddresses(array.memoryAddress(0),
cnt, remoteAddress.getAddress(), remoteAddress.getPort());
if (remoteAddress == null) {
writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
} else {
writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
remoteAddress.getAddress(), remoteAddress.getPort());
}
} else {
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
remoteAddress.getAddress(), remoteAddress.getPort());
if (remoteAddress == null) {
writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
} else {
writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
remoteAddress.getAddress(), remoteAddress.getPort());
}
}
return writtenBytes > 0;
@ -399,48 +384,27 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
@Override
protected void doDisconnect() throws Exception {
socket.disconnect();
connected = active = false;
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (super.doConnect(remoteAddress, localAddress)) {
connected = true;
return true;
}
return false;
}
@Override
protected void doClose() throws Exception {
super.doClose();
connected = false;
}
final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
@Override
public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
boolean success = false;
try {
try {
boolean wasActive = isActive();
InetSocketAddress remoteAddress = (InetSocketAddress) remote;
if (local != null) {
InetSocketAddress localAddress = (InetSocketAddress) local;
doBind(localAddress);
}
checkResolvable(remoteAddress);
KQueueDatagramChannel.this.remote = remoteAddress;
KQueueDatagramChannel.this.local = socket.localAddress();
success = true;
// First notify the promise before notifying the handler.
channelPromise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} finally {
if (!success) {
doClose();
} else {
connected = true;
}
}
} catch (Throwable cause) {
channelPromise.tryFailure(cause);
}
}
@Override
void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
assert eventLoop().inEventLoop();

View File

@ -29,7 +29,6 @@ import static io.netty.channel.unix.NativeInetAddress.address;
@UnstableApi
public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel implements ServerSocketChannel {
private final KQueueServerSocketChannelConfig config;
private volatile InetSocketAddress local;
public KQueueServerSocketChannel() {
super(newSocketStream(), false);
@ -45,18 +44,11 @@ public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel
KQueueServerSocketChannel(BsdSocket fd) {
super(fd);
config = new KQueueServerSocketChannelConfig(this);
// As we create an KQueueServerSocketChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound already.
local = fd.localAddress();
}
KQueueServerSocketChannel(BsdSocket fd, boolean active) {
super(fd, active);
config = new KQueueServerSocketChannelConfig(this);
// As we create an KQueueServerSocketChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound already.
local = fd.localAddress();
}
@Override
@ -66,10 +58,8 @@ public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
InetSocketAddress addr = (InetSocketAddress) localAddress;
checkResolvable(addr);
socket.bind(addr);
local = socket.localAddress();
super.doBind(localAddress);
// TODO(scott): tcp fast open here!
socket.listen(config.getBacklog());
active = true;
@ -90,11 +80,6 @@ public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel
return config;
}
@Override
protected InetSocketAddress localAddress0() {
return local;
}
@Override
protected Channel newChildChannel(int fd, byte[] address, int offset, int len) throws Exception {
return new KQueueSocketChannel(this, new BsdSocket(fd), address(address, offset, len));

View File

@ -19,24 +19,15 @@ import io.netty.channel.Channel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.UnstableApi;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.AlreadyConnectedException;
import java.util.concurrent.Executor;
@UnstableApi
public final class KQueueSocketChannel extends AbstractKQueueStreamChannel implements SocketChannel {
private final KQueueSocketChannelConfig config;
private volatile InetSocketAddress local;
private volatile InetSocketAddress remote;
private InetSocketAddress requestedRemote;
public KQueueSocketChannel() {
super(null, BsdSocket.newSocketStream(), false);
config = new KQueueSocketChannelConfig(this);
@ -44,20 +35,12 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple
public KQueueSocketChannel(int fd) {
super(new BsdSocket(fd));
// As we create an EpollSocketChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound/connected already.
remote = socket.remoteAddress();
local = socket.localAddress();
config = new KQueueSocketChannelConfig(this);
}
KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remote) {
super(parent, fd, true);
KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remoteAddress) {
super(parent, fd, remoteAddress);
config = new KQueueSocketChannelConfig(this);
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
this.remote = remote;
local = fd.localAddress();
}
@Override
@ -70,23 +53,6 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple
return (InetSocketAddress) super.localAddress();
}
@Override
protected SocketAddress localAddress0() {
return local;
}
@Override
protected SocketAddress remoteAddress0() {
return remote;
}
@Override
protected void doBind(SocketAddress local) throws Exception {
InetSocketAddress localAddress = (InetSocketAddress) local;
socket.bind(localAddress);
this.local = socket.localAddress();
}
@Override
public KQueueSocketChannelConfig config() {
return config;
@ -102,54 +68,6 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple
return new KQueueSocketChannelUnsafe();
}
private static InetSocketAddress computeRemoteAddr(InetSocketAddress remoteAddr, InetSocketAddress osRemoteAddr) {
if (osRemoteAddr != null) {
if (PlatformDependent.javaVersion() >= 7) {
try {
// Only try to construct a new InetSocketAddress if we using java >= 7 as getHostString() does not
// exists in earlier releases and so the retrieval of the hostname could block the EventLoop if a
// reverse lookup would be needed.
return new InetSocketAddress(InetAddress.getByAddress(remoteAddr.getHostString(),
osRemoteAddr.getAddress().getAddress()),
osRemoteAddr.getPort());
} catch (UnknownHostException ignore) {
// Should never happen but fallback to osRemoteAddr anyway.
}
}
return osRemoteAddr;
}
return remoteAddr;
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
checkResolvable((InetSocketAddress) localAddress);
}
InetSocketAddress remoteAddr = (InetSocketAddress) remoteAddress;
checkResolvable(remoteAddr);
if (remote != null) {
// Check if already connected before trying to connect. This is needed as connect(...) will not return -1
// and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
// later.
throw new AlreadyConnectedException();
}
boolean connected = super.doConnect(remoteAddress, localAddress);
if (connected) {
remote = computeRemoteAddr(remoteAddr, socket.remoteAddress());
} else {
// Store for later usage in doFinishConnect()
requestedRemote = remoteAddr;
}
// We always need to set the localAddress even if not connected yet as the bind already took place.
//
// See https://github.com/netty/netty/issues/3463
local = socket.localAddress();
return connected;
}
private final class KQueueSocketChannelUnsafe extends KQueueStreamUnsafe {
@Override
protected Executor prepareToClose() {
@ -171,15 +89,5 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple
}
return null;
}
@Override
boolean doFinishConnect() throws Exception {
if (super.doFinishConnect()) {
remote = computeRemoteAddr(requestedRemote, socket.remoteAddress());
requestedRemote = null;
return true;
}
return false;
}
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.kqueue;
import io.netty.bootstrap.Bootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.DatagramConnectNotExistsTest;
import java.util.List;
public class KQueueDatagramConnectNotExistsTest extends DatagramConnectNotExistsTest {
@Override
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
return KQueueSocketTestPermutation.INSTANCE.datagramSocket();
}
}

View File

@ -25,7 +25,6 @@ import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.Socket;
import io.netty.channel.unix.tests.UnixTestUtils;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
@ -34,8 +33,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -166,6 +163,17 @@ class KQueueSocketTestPermutation extends SocketTestPermutation {
);
}
@Override
public List<BootstrapFactory<Bootstrap>> datagramSocket() {
return Collections.<BootstrapFactory<Bootstrap>>singletonList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueDatagramChannel.class);
}
}
);
}
public static DomainSocketAddress newSocketAddress() {
return UnixTestUtils.newSocketAddress();
}

View File

@ -24,6 +24,7 @@
static jclass runtimeExceptionClass = NULL;
static jclass channelExceptionClass = NULL;
static jclass ioExceptionClass = NULL;
static jclass portUnreachableExceptionClass = NULL;
static jclass closedChannelExceptionClass = NULL;
static jmethodID closedChannelExceptionMethodId = NULL;
@ -56,6 +57,10 @@ void netty_unix_errors_throwIOException(JNIEnv* env, char* message) {
(*env)->ThrowNew(env, ioExceptionClass, message);
}
void netty_unix_errors_throwPortUnreachableException(JNIEnv* env, char* message) {
(*env)->ThrowNew(env, portUnreachableExceptionClass, message);
}
void netty_unix_errors_throwIOExceptionErrorNo(JNIEnv* env, char* message, int errorNumber) {
char* allocatedMessage = exceptionMessage(message, errorNumber);
(*env)->ThrowNew(env, ioExceptionClass, allocatedMessage);
@ -207,6 +212,18 @@ jint netty_unix_errors_JNI_OnLoad(JNIEnv* env, const char* packagePrefix) {
return JNI_ERR;
}
jclass localPortUnreachableExceptionClass = (*env)->FindClass(env, "java/net/PortUnreachableException");
if (localPortUnreachableExceptionClass == NULL) {
// pending exception...
return JNI_ERR;
}
portUnreachableExceptionClass = (jclass) (*env)->NewGlobalRef(env, localPortUnreachableExceptionClass);
if (portUnreachableExceptionClass == NULL) {
// out-of-memory!
netty_unix_errors_throwOutOfMemoryError(env);
return JNI_ERR;
}
return NETTY_JNI_VERSION;
}
@ -224,6 +241,10 @@ void netty_unix_errors_JNI_OnUnLoad(JNIEnv* env) {
(*env)->DeleteGlobalRef(env, ioExceptionClass);
ioExceptionClass = NULL;
}
if (portUnreachableExceptionClass != NULL) {
(*env)->DeleteGlobalRef(env, portUnreachableExceptionClass);
portUnreachableExceptionClass = NULL;
}
if (closedChannelExceptionClass != NULL) {
(*env)->DeleteGlobalRef(env, closedChannelExceptionClass);
closedChannelExceptionClass = NULL;

View File

@ -23,6 +23,7 @@ void netty_unix_errors_throwRuntimeExceptionErrorNo(JNIEnv* env, char* message,
void netty_unix_errors_throwChannelExceptionErrorNo(JNIEnv* env, char* message, int errorNumber);
void netty_unix_errors_throwIOException(JNIEnv* env, char* message);
void netty_unix_errors_throwIOExceptionErrorNo(JNIEnv* env, char* message, int errorNumber);
void netty_unix_errors_throwPortUnreachableException(JNIEnv* env, char* message);
void netty_unix_errors_throwClosedChannelException(JNIEnv* env);
void netty_unix_errors_throwOutOfMemoryError(JNIEnv* env);

View File

@ -305,6 +305,10 @@ static jobject _recvFrom(JNIEnv* env, jint fd, void* buffer, jint pos, jint limi
netty_unix_errors_throwClosedChannelException(env);
return NULL;
}
if (err == ECONNREFUSED) {
netty_unix_errors_throwPortUnreachableException(env, "recvfrom() failed");
return NULL;
}
netty_unix_errors_throwIOExceptionErrorNo(env, "recvfrom() failed: ", err);
return NULL;
}
@ -412,6 +416,38 @@ static jint netty_unix_socket_finishConnect(JNIEnv* env, jclass clazz, jint fd)
return -optval;
}
static jint netty_unix_socket_disconnect(JNIEnv* env, jclass clazz, jint fd) {
struct sockaddr_storage addr;
int len;
memset(&addr, 0, sizeof(addr));
// You can disconnect connection-less sockets by using AF_UNSPEC.
// See man 2 connect.
if (socketType == AF_INET6) {
struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) &addr;
ip6addr->sin6_family = AF_UNSPEC;
len = sizeof(struct sockaddr_in6);
} else {
struct sockaddr_in* ipaddr = (struct sockaddr_in*) &addr;
ipaddr->sin_family = AF_UNSPEC;
len = sizeof(struct sockaddr_in);
}
int res;
int err;
do {
res = connect(fd, (struct sockaddr*) &addr, len);
} while (res == -1 && ((err = errno) == EINTR));
// EAFNOSUPPORT is harmless in this case.
// See http://www.unix.com/man-page/osx/2/connect/
if (res < 0 && err != EAFNOSUPPORT) {
return -err;
}
return 0;
}
static jint netty_unix_socket_accept(JNIEnv* env, jclass clazz, jint fd, jbyteArray acceptedAddress) {
jint socketFd;
jsize len;
@ -822,6 +858,7 @@ static const JNINativeMethod fixed_method_table[] = {
{ "listen", "(II)I", (void *) netty_unix_socket_listen },
{ "connect", "(I[BII)I", (void *) netty_unix_socket_connect },
{ "finishConnect", "(I)I", (void *) netty_unix_socket_finishConnect },
{ "disconnect", "(I)I", (void *) netty_unix_socket_disconnect},
{ "accept", "(I[B)I", (void *) netty_unix_socket_accept },
{ "remoteAddress", "(I)[B", (void *) netty_unix_socket_remoteAddress },
{ "localAddress", "(I)[B", (void *) netty_unix_socket_localAddress },

View File

@ -22,11 +22,13 @@ import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.PortUnreachableException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
import static io.netty.channel.unix.Errors.ERROR_ECONNREFUSED_NEGATIVE;
import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
import static io.netty.channel.unix.Errors.ioResult;
@ -141,6 +143,9 @@ public class Socket extends FileDescriptor {
if (res >= 0) {
return res;
}
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
throw new PortUnreachableException("sendTo failed");
}
return ioResult("sendTo", res, SEND_TO_CONNECTION_RESET_EXCEPTION, SEND_TO_CLOSED_CHANNEL_EXCEPTION);
}
@ -162,6 +167,9 @@ public class Socket extends FileDescriptor {
if (res >= 0) {
return res;
}
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
throw new PortUnreachableException("sendToAddress failed");
}
return ioResult("sendToAddress", res,
SEND_TO_ADDRESS_CONNECTION_RESET_EXCEPTION, SEND_TO_ADDRESS_CLOSED_CHANNEL_EXCEPTION);
}
@ -183,6 +191,10 @@ public class Socket extends FileDescriptor {
if (res >= 0) {
return res;
}
if (res == ERROR_ECONNREFUSED_NEGATIVE) {
throw new PortUnreachableException("sendToAddresses failed");
}
return ioResult("sendToAddresses", res,
CONNECTION_RESET_EXCEPTION_SENDMSG, SEND_TO_ADDRESSES_CLOSED_CHANNEL_EXCEPTION);
}
@ -257,6 +269,13 @@ public class Socket extends FileDescriptor {
return true;
}
public final void disconnect() throws IOException {
int res = disconnect(fd);
if (res < 0) {
throwConnectException("disconnect", FINISH_CONNECT_REFUSED_EXCEPTION, res);
}
}
public final void bind(SocketAddress socketAddress) throws IOException {
if (socketAddress instanceof InetSocketAddress) {
InetSocketAddress addr = (InetSocketAddress) socketAddress;
@ -432,6 +451,7 @@ public class Socket extends FileDescriptor {
private static native int connect(int fd, byte[] address, int scopeId, int port);
private static native int connectDomainSocket(int fd, byte[] path);
private static native int finishConnect(int fd);
private static native int disconnect(int fd);
private static native int bind(int fd, byte[] address, int scopeId, int port);
private static native int bindDomainSocket(int fd, byte[] path);
private static native int listen(int fd, int backlog);

View File

@ -16,6 +16,11 @@
package io.netty.channel.unix;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import static io.netty.channel.unix.Limits.IOV_MAX;
@ -35,4 +40,23 @@ public final class UnixChannelUtil {
static boolean isBufferCopyNeededForWrite(ByteBuf byteBuf, int iovMax) {
return !byteBuf.hasMemoryAddress() && (!byteBuf.isDirect() || byteBuf.nioBufferCount() > iovMax);
}
public static InetSocketAddress computeRemoteAddr(InetSocketAddress remoteAddr, InetSocketAddress osRemoteAddr) {
if (osRemoteAddr != null) {
if (PlatformDependent.javaVersion() >= 7) {
try {
// Only try to construct a new InetSocketAddress if we using java >= 7 as getHostString() does not
// exists in earlier releases and so the retrieval of the hostname could block the EventLoop if a
// reverse lookup would be needed.
return new InetSocketAddress(InetAddress.getByAddress(remoteAddr.getHostString(),
osRemoteAddr.getAddress().getAddress()),
osRemoteAddr.getPort());
} catch (UnknownHostException ignore) {
// Should never happen but fallback to osRemoteAddr anyway.
}
}
return osRemoteAddr;
}
return remoteAddr;
}
}