SslHandler flushing with TCP Fast Open fix (#11077)

Motivation:
SslHandler owns the responsibility to flush non-application data
(e.g. handshake, renegotiation, etc.) to the socket. However when
TCP Fast Open is supported but the client_hello cannot be written
in the SYN the client_hello may not always be flushed. SslHandler
may not wrap/flush previously written/flushed data in the event
it was not able to be wrapped due to NEED_UNWRAP state being
encountered in wrap (e.g. peer initiated renegotiation).

Modifications:
- SslHandler to flush in channelActive() if TFO is enabled and
  the client_hello cannot be written in the SYN.
- SslHandler to wrap application data after non-application data
  wrap and handshake status is FINISHED.
- SocketSslEchoTest only flushes when writes are done, and waits
  for the handshake to complete before writing.

Result:
SslHandler flushes handshake data for TFO, and previously flushed
application data after peer initiated renegotiation finishes.
This commit is contained in:
Scott Mitchell 2021-03-14 06:18:27 -07:00 committed by Norman Maurer
parent 7a70fabb07
commit e5ff6216ff
12 changed files with 172 additions and 129 deletions

View File

@ -30,6 +30,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
@ -978,7 +979,7 @@ public class SslHandler extends ByteToMessageDecoder {
out = allocateOutNetBuf(ctx, 2048, 1);
}
SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
HandshakeStatus status = result.getHandshakeStatus();
if (result.bytesProduced() > 0) {
ctx.write(out).addListener((ChannelFutureListener) future -> {
Throwable cause = future.cause();
@ -987,12 +988,18 @@ public class SslHandler extends ByteToMessageDecoder {
}
});
if (inUnwrap) {
// We may be here because we read data and discovered the remote peer initiated a renegotiation
// and this write is to complete the new handshake. The user may have previously done a
// writeAndFlush which wasn't able to wrap data due to needing the pending handshake, so we
// attempt to wrap application data here if any is pending.
if (status == HandshakeStatus.FINISHED && !pendingUnencryptedWrites.isEmpty()) {
wrap(ctx, true);
}
needsFlush = true;
}
out = null;
}
HandshakeStatus status = result.getHandshakeStatus();
switch (status) {
case FINISHED:
setHandshakeSuccess();
@ -1784,27 +1791,26 @@ public class SslHandler extends ByteToMessageDecoder {
* marked as success by this method
*/
private boolean setHandshakeSuccessIfStillHandshaking() {
if (!handshakePromise.isDone()) {
setHandshakeSuccess();
return true;
}
return false;
return setHandshakeSuccess();
}
/**
* Notify all the handshake futures about the successfully handshake
* @return {@code true} if {@link #handshakePromise} was set successfully and a {@link SslHandshakeCompletionEvent}
* was fired. {@code false} otherwise.
*/
private void setHandshakeSuccess() {
boolean notified = handshakePromise.trySuccess(ctx.channel());
SSLSession session = engine.getSession();
// There seems to be a bug in the SSLEngineImpl that is part of the OpenJDK that results in returning
// HandshakeStatus.FINISHED multiple times which is not expected. This only happens in TLSv1.3 so lets
// ensure we only notify once in this case.
//
// This is safe as TLSv1.3 does not support renegotiation and so we should never see two handshake events.
if (notified || !SslUtils.PROTOCOL_TLS_V1_3.equals(session.getProtocol())) {
private boolean setHandshakeSuccess() {
if (readDuringHandshake && !ctx.channel().config().isAutoRead()) {
readDuringHandshake = false;
ctx.read();
}
// Our control flow may invoke this method multiple times for a single FINISHED event. For example
// wrapNonAppData may drain pendingUnencryptedWrites in wrap which transitions to handshake from FINISHED to
// NOT_HANDSHAKING which invokes setHandshakeSuccessIfStillHandshaking, and then wrapNonAppData also directly
// invokes this method.
if (handshakePromise.trySuccess(ctx.channel())) {
if (logger.isDebugEnabled()) {
SSLSession session = engine.getSession();
logger.debug(
"{} HANDSHAKEN: protocol:{} cipher suite:{}",
ctx.channel(),
@ -1812,12 +1818,9 @@ public class SslHandler extends ByteToMessageDecoder {
session.getCipherSuite());
}
ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
return true;
}
if (readDuringHandshake && !ctx.channel().config().isAutoRead()) {
readDuringHandshake = false;
ctx.read();
}
return false;
}
/**
@ -1956,6 +1959,11 @@ public class SslHandler extends ByteToMessageDecoder {
// With TCP Fast Open, we write to the outbound buffer before the TCP connect is established.
// The buffer will then be flushed as part of establishing the connection, saving us a round-trip.
startHandshakeProcessing(active);
// If we weren't able to include client_hello in the TCP SYN (e.g. no token, disabled at the OS) we have to
// flush pending data in the outbound buffer later in channelActive().
final ChannelOutboundBuffer outboundBuffer;
needsFlush |= fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
outboundBuffer.totalPendingWriteBytes() > 0);
}
}
@ -1969,6 +1977,8 @@ public class SslHandler extends ByteToMessageDecoder {
handshake(flushAtEnd);
}
applyHandshakeTimeout();
} else if (needsFlush) {
forceFlush(ctx);
}
}

View File

@ -52,21 +52,24 @@ public abstract class RenegotiateTest {
ServerBootstrap sb = new ServerBootstrap();
sb.group(group).channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(context.newHandler(ch.alloc()));
protected void initChannel(Channel ch) {
SslHandler handler = context.newHandler(ch.alloc());
handler.setHandshakeTimeoutMillis(0);
ch.pipeline().addLast(handler);
ch.pipeline().addLast(new ChannelHandler() {
private boolean renegotiate;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
@Override
public void userEventTriggered(
final ChannelHandlerContext ctx, Object evt) throws Exception {
final ChannelHandlerContext ctx, Object evt) {
if (!renegotiate && evt instanceof SslHandshakeCompletionEvent) {
SslHandshakeCompletionEvent event = (SslHandshakeCompletionEvent) evt;
@ -77,9 +80,9 @@ public abstract class RenegotiateTest {
handler.renegotiate().addListener((FutureListener<Channel>) future -> {
if (!future.isSuccess()) {
error.compareAndSet(null, future.cause());
latch.countDown();
ctx.close();
}
latch.countDown();
});
} else {
error.compareAndSet(null, event.cause());
@ -102,14 +105,16 @@ public abstract class RenegotiateTest {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(LocalChannel.class)
.handler(new ChannelInitializer<Channel>() {
.handler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(clientContext.newHandler(ch.alloc()));
protected void initChannel(Channel ch) {
SslHandler handler = clientContext.newHandler(ch.alloc());
handler.setHandshakeTimeoutMillis(0);
ch.pipeline().addLast(handler);
ch.pipeline().addLast(new ChannelHandler() {
@Override
public void userEventTriggered(
ChannelHandlerContext ctx, Object evt) throws Exception {
ChannelHandlerContext ctx, Object evt) {
if (evt instanceof SslHandshakeCompletionEvent) {
SslHandshakeCompletionEvent event = (SslHandshakeCompletionEvent) evt;
if (!event.isSuccess()) {

View File

@ -23,6 +23,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
@ -52,10 +53,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLEngine;
import static org.hamcrest.MatcherAssert.assertThat;
@ -255,8 +258,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
@SuppressWarnings("deprecation")
public void initChannel(Channel sch) throws Exception {
public void initChannel(Channel sch) {
serverChannel = sch;
if (serverUsesDelegatedTaskExecutor) {
@ -265,6 +267,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
} else {
serverSslHandler = serverCtx.newHandler(sch.alloc());
}
serverSslHandler.setHandshakeTimeoutMillis(0);
sch.pipeline().addLast("ssl", serverSslHandler);
if (useChunkedWriteHandler) {
@ -274,10 +277,10 @@ public class SocketSslEchoTest extends AbstractSocketTest {
}
});
final CountDownLatch clientHandshakeEventLatch = new CountDownLatch(1);
cb.handler(new ChannelInitializer<Channel>() {
@Override
@SuppressWarnings("deprecation")
public void initChannel(Channel sch) throws Exception {
public void initChannel(Channel sch) {
clientChannel = sch;
if (clientUsesDelegatedTaskExecutor) {
@ -286,12 +289,22 @@ public class SocketSslEchoTest extends AbstractSocketTest {
} else {
clientSslHandler = clientCtx.newHandler(sch.alloc());
}
clientSslHandler.setHandshakeTimeoutMillis(0);
sch.pipeline().addLast("ssl", clientSslHandler);
if (useChunkedWriteHandler) {
sch.pipeline().addLast(new ChunkedWriteHandler());
}
sch.pipeline().addLast("handler", clientHandler);
sch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof SslHandshakeCompletionEvent) {
clientHandshakeEventLatch.countDown();
}
ctx.fireUserEventTriggered(evt);
}
});
}
});
@ -300,9 +313,12 @@ public class SocketSslEchoTest extends AbstractSocketTest {
final Future<Channel> clientHandshakeFuture = clientSslHandler.handshakeFuture();
// Wait for the handshake to complete before we flush anything. SslHandler should flush non-application data.
clientHandshakeFuture.sync();
clientHandshakeEventLatch.await();
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(data, 0, FIRST_MESSAGE_SIZE));
clientSendCounter.set(FIRST_MESSAGE_SIZE);
clientHandshakeFuture.sync();
boolean needsRenegotiation = renegotiation.type == RenegotiationType.CLIENT_INITIATED;
Future<Channel> renegoFuture = null;
@ -457,21 +473,21 @@ public class SocketSslEchoTest extends AbstractSocketTest {
if (!autoRead) {
ctx.read();
}
ctx.fireChannelActive();
}
@Override
public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
try {
ctx.flush();
} finally {
// We intentionally do not ctx.flush() here because we want to verify the SslHandler correctly flushing
// non-application and previously flushed writes internally.
if (!autoRead) {
ctx.read();
}
}
ctx.fireChannelReadComplete();
}
@Override
public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof SslHandshakeCompletionEvent) {
SslHandshakeCompletionEvent handshakeEvt = (SslHandshakeCompletionEvent) evt;
if (handshakeEvt.cause() != null) {
@ -481,6 +497,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
negoCounter.incrementAndGet();
logStats("HANDSHAKEN");
}
ctx.fireUserEventTriggered(evt);
}
@Override
@ -528,7 +545,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
}
@Override
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
public final void channelRegistered(ChannelHandlerContext ctx) {
renegoFuture = null;
}
@ -546,7 +563,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
if (useCompositeByteBuf) {
buf = Unpooled.compositeBuffer().addComponent(true, buf);
}
ctx.write(buf);
ctx.writeAndFlush(buf);
recvCounter.addAndGet(actual.length);

View File

@ -486,13 +486,10 @@ static jboolean netty_epoll_native_isSupportingRecvmmsg(JNIEnv* env, jclass claz
return JNI_TRUE;
}
static jboolean netty_epoll_native_isSupportingTcpFastopen(JNIEnv* env, jclass clazz) {
static jint netty_epoll_native_tcpFastopenMode(JNIEnv* env, jclass clazz) {
int fastopen = 0;
getSysctlValue("/proc/sys/net/ipv4/tcp_fastopen", &fastopen);
if (fastopen > 0) {
return JNI_TRUE;
}
return JNI_FALSE;
return fastopen;
}
static jint netty_epoll_native_epollet(JNIEnv* env, jclass clazz) {
@ -552,7 +549,7 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = {
{ "tcpMd5SigMaxKeyLen", "()I", (void *) netty_epoll_native_tcpMd5SigMaxKeyLen },
{ "isSupportingSendmmsg", "()Z", (void *) netty_epoll_native_isSupportingSendmmsg },
{ "isSupportingRecvmmsg", "()Z", (void *) netty_epoll_native_isSupportingRecvmmsg },
{ "isSupportingTcpFastopen", "()Z", (void *) netty_epoll_native_isSupportingTcpFastopen },
{ "tcpFastopenMode", "()I", (void *) netty_epoll_native_tcpFastopenMode },
{ "kernelVersion", "()Ljava/lang/String;", (void *) netty_epoll_native_kernelVersion }
};
static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]);

View File

@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.Map;
import static io.netty.channel.epoll.LinuxSocket.newSocketStream;
import static io.netty.channel.epoll.Native.IS_SUPPORTING_TCP_FASTOPEN_SERVER;
import static io.netty.channel.unix.NativeInetAddress.address;
/**
@ -64,8 +65,9 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
super.doBind(localAddress);
if (Native.IS_SUPPORTING_TCP_FASTOPEN && config.getTcpFastopen() > 0) {
socket.setTcpFastOpen(config.getTcpFastopen());
final int tcpFastopen;
if (IS_SUPPORTING_TCP_FASTOPEN_SERVER && (tcpFastopen = config.getTcpFastopen()) > 0) {
socket.setTcpFastOpen(tcpFastopen);
}
socket.listen(config.getBacklog());
active = true;

View File

@ -35,6 +35,7 @@ import java.util.Map;
import java.util.concurrent.Executor;
import static io.netty.channel.epoll.LinuxSocket.newSocketStream;
import static io.netty.channel.epoll.Native.IS_SUPPORTING_TCP_FASTOPEN_CLIENT;
/**
* {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
@ -118,7 +119,7 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
@Override
boolean doConnect0(SocketAddress remote) throws Exception {
if (Native.IS_SUPPORTING_TCP_FASTOPEN && config.isTcpFastOpenConnect()) {
if (IS_SUPPORTING_TCP_FASTOPEN_CLIENT && config.isTcpFastOpenConnect()) {
ChannelOutboundBuffer outbound = unsafe().outboundBuffer();
outbound.addFlush();
Object curr;

View File

@ -36,8 +36,8 @@ import static io.netty.channel.epoll.NativeStaticallyReferencedJniMethods.epollo
import static io.netty.channel.epoll.NativeStaticallyReferencedJniMethods.epollrdhup;
import static io.netty.channel.epoll.NativeStaticallyReferencedJniMethods.isSupportingRecvmmsg;
import static io.netty.channel.epoll.NativeStaticallyReferencedJniMethods.isSupportingSendmmsg;
import static io.netty.channel.epoll.NativeStaticallyReferencedJniMethods.isSupportingTcpFastopen;
import static io.netty.channel.epoll.NativeStaticallyReferencedJniMethods.kernelVersion;
import static io.netty.channel.epoll.NativeStaticallyReferencedJniMethods.tcpFastopenMode;
import static io.netty.channel.epoll.NativeStaticallyReferencedJniMethods.tcpMd5SigMaxKeyLen;
import static io.netty.channel.unix.Errors.ioResult;
import static io.netty.channel.unix.Errors.newIOException;
@ -97,7 +97,27 @@ public final class Native {
public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg();
static final boolean IS_SUPPORTING_RECVMMSG = isSupportingRecvmmsg();
static final boolean IS_SUPPORTING_UDP_SEGMENT = isSupportingUdpSegment();
public static final boolean IS_SUPPORTING_TCP_FASTOPEN = isSupportingTcpFastopen();
private static final int TFO_ENABLED_CLIENT_MASK = 0x1;
private static final int TFO_ENABLED_SERVER_MASK = 0x2;
private static final int TCP_FASTOPEN_MODE = tcpFastopenMode();
/**
* <a href ="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">tcp_fastopen</a> client mode enabled
* state.
*/
static final boolean IS_SUPPORTING_TCP_FASTOPEN_CLIENT =
(TCP_FASTOPEN_MODE & TFO_ENABLED_CLIENT_MASK) == TFO_ENABLED_CLIENT_MASK;
/**
* <a href ="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">tcp_fastopen</a> server mode enabled
* state.
*/
static final boolean IS_SUPPORTING_TCP_FASTOPEN_SERVER =
(TCP_FASTOPEN_MODE & TFO_ENABLED_SERVER_MASK) == TFO_ENABLED_SERVER_MASK;
/**
* @deprecated Use {@link #IS_SUPPORTING_TCP_FASTOPEN_CLIENT} or {@link #IS_SUPPORTING_TCP_FASTOPEN_SERVER}.
*/
@Deprecated
public static final boolean IS_SUPPORTING_TCP_FASTOPEN = IS_SUPPORTING_TCP_FASTOPEN_CLIENT ||
IS_SUPPORTING_TCP_FASTOPEN_SERVER;
public static final int TCP_MD5SIG_MAXKEYLEN = tcpMd5SigMaxKeyLen();
public static final String KERNEL_VERSION = kernelVersion();

View File

@ -41,6 +41,6 @@ final class NativeStaticallyReferencedJniMethods {
static native int uioMaxIov();
static native boolean isSupportingSendmmsg();
static native boolean isSupportingRecvmmsg();
static native boolean isSupportingTcpFastopen();
static native int tcpFastopenMode();
static native String kernelVersion();
}

View File

@ -33,19 +33,15 @@ import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
import io.netty.testsuite.transport.socket.SocketTestPermutation;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static io.netty.channel.epoll.Native.IS_SUPPORTING_TCP_FASTOPEN_CLIENT;
import static io.netty.channel.epoll.Native.IS_SUPPORTING_TCP_FASTOPEN_SERVER;
class EpollSocketTestPermutation extends SocketTestPermutation {
static final EpollSocketTestPermutation INSTANCE = new EpollSocketTestPermutation();
@ -57,11 +53,6 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
new MultithreadEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-epoll-worker", true),
EpollHandler.newFactory());
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollSocketTestPermutation.class);
// Constants describing if/how TCP Fast Open is allowed to work on Linux:
private static final int TFO_ENABLED_CLIENT = 1;
private static final int TFO_ENABLED_SERVER = 2;
@Override
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> socket() {
List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> list =
@ -84,7 +75,7 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
List<BootstrapFactory<ServerBootstrap>> toReturn = new ArrayList<>();
toReturn.add(() -> new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
.channel(EpollServerSocketChannel.class));
if (isFastOpen()) {
if (IS_SUPPORTING_TCP_FASTOPEN_SERVER) {
toReturn.add(() -> {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
.channel(EpollServerSocketChannel.class);
@ -108,14 +99,17 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
@Override
public List<BootstrapFactory<Bootstrap>> clientSocketWithFastOpen() {
if (isFastOpen()) {
// Keep NIO fixture last.
return Arrays.asList(
() -> new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class),
() -> new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class)
.option(ChannelOption.TCP_FASTOPEN_CONNECT, true),
() -> new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class)
);
List<BootstrapFactory<Bootstrap>> factories = clientSocket();
if (IS_SUPPORTING_TCP_FASTOPEN_CLIENT) {
int insertIndex = factories.size() - 1; // Keep NIO fixture last.
factories.add(insertIndex, new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollSocketChannel.class)
.option(ChannelOption.TCP_FASTOPEN_CONNECT, true);
}
});
}
return clientSocket();
}
@ -195,36 +189,6 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
);
}
public boolean isFastOpen() {
int tfoEnabled = AccessController.doPrivileged((PrivilegedAction<Integer>) () -> {
int fastopen = 0;
File file = new File("/proc/sys/net/ipv4/tcp_fastopen");
if (file.exists()) {
BufferedReader in = null;
try {
in = new BufferedReader(new FileReader(file));
fastopen = Integer.parseInt(in.readLine());
logger.debug("{}: {}", file, fastopen);
} catch (Exception e) {
logger.debug("Failed to get TCP_FASTOPEN from: {}", file, e);
} finally {
if (in != null) {
try {
in.close();
} catch (Exception e) {
// Ignored.
}
}
}
} else {
logger.debug("{}: {} (non-existent)", file, fastopen);
}
return fastopen;
});
// TCP Fast Open needs to be enabled for both clients and servers, before we can test our intergration with it.
return tfoEnabled == TFO_ENABLED_CLIENT + TFO_ENABLED_SERVER;
}
public static DomainSocketAddress newSocketAddress() {
return UnixTestUtils.newSocketAddress();
}

View File

@ -26,7 +26,19 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.NotYetConnectedException;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.*;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errnoEAGAIN;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errnoEBADF;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errnoECONNRESET;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errnoEINPROGRESS;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errnoENOENT;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errnoENOTCONN;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errnoEPIPE;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errnoEWOULDBLOCK;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errorEALREADY;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errorECONNREFUSED;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errorEISCONN;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.errorENETUNREACH;
import static io.netty.channel.unix.ErrorsStaticallyReferencedJniMethods.strError;
/**
* <strong>Internal usage only!</strong>
@ -94,21 +106,40 @@ public final class Errors {
}
}
public static void throwConnectException(String method, int err)
throws IOException {
static boolean handleConnectErrno(String method, int err) throws IOException {
if (err == ERRNO_EINPROGRESS_NEGATIVE || err == ERROR_EALREADY_NEGATIVE) {
// connect not complete yet need to wait for EPOLLOUT event.
// EALREADY has been observed when using tcp fast open on centos8.
return false;
}
throw newConnectException0(method, err);
}
/**
* @deprecated Use {@link #handleConnectErrno(String, int)}.
* @param method The native method name which caused the errno.
* @param err the negative value of the errno.
* @throws IOException The errno translated into an exception.
*/
@Deprecated
public static void throwConnectException(String method, int err) throws IOException {
if (err == ERROR_EALREADY_NEGATIVE) {
throw new ConnectionPendingException();
}
throw newConnectException0(method, err);
}
private static IOException newConnectException0(String method, int err) {
if (err == ERROR_ENETUNREACH_NEGATIVE) {
throw new NoRouteToHostException();
return new NoRouteToHostException();
}
if (err == ERROR_EISCONN_NEGATIVE) {
throw new AlreadyConnectedException();
}
if (err == ERRNO_ENOENT_NEGATIVE) {
throw new FileNotFoundException();
return new FileNotFoundException();
}
throw new ConnectException(method + "(..) failed: " + ERRORS[-err]);
return new ConnectException(method + "(..) failed: " + ERRORS[-err]);
}
public static NativeIoException newConnectionResetException(String method, int errnoNegative) {

View File

@ -30,13 +30,12 @@ import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
import static io.netty.channel.unix.Errors.ERROR_ECONNREFUSED_NEGATIVE;
import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE;
import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
import static io.netty.channel.unix.Errors.ERROR_ECONNREFUSED_NEGATIVE;
import static io.netty.channel.unix.Errors.handleConnectErrno;
import static io.netty.channel.unix.Errors.ioResult;
import static io.netty.channel.unix.Errors.newIOException;
import static io.netty.channel.unix.Errors.throwConnectException;
import static io.netty.channel.unix.LimitsStaticallyReferencedJniMethods.udsSunPathSize;
import static io.netty.channel.unix.NativeInetAddress.address;
import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address;
@ -268,11 +267,7 @@ public class Socket extends FileDescriptor {
throw new Error("Unexpected SocketAddress implementation " + socketAddress);
}
if (res < 0) {
if (res == ERRNO_EINPROGRESS_NEGATIVE) {
// connect not complete yet need to wait for EPOLLOUT event
return false;
}
throwConnectException("connect", res);
return handleConnectErrno("connect", res);
}
return true;
}
@ -280,11 +275,7 @@ public class Socket extends FileDescriptor {
public final boolean finishConnect() throws IOException {
int res = finishConnect(fd);
if (res < 0) {
if (res == ERRNO_EINPROGRESS_NEGATIVE) {
// connect still in progress
return false;
}
throwConnectException("finishConnect", res);
return handleConnectErrno("finishConnect", res);
}
return true;
}
@ -292,7 +283,7 @@ public class Socket extends FileDescriptor {
public final void disconnect() throws IOException {
int res = disconnect(fd, ipv6);
if (res < 0) {
throwConnectException("disconnect", res);
handleConnectErrno("disconnect", res);
}
}

View File

@ -263,6 +263,11 @@ public abstract class AbstractCoalescingBufferQueue {
}
}
@Override
public String toString() {
return "bytes: " + readableBytes + " buffers: " + (size() >> 1);
}
/**
* Calculate the result of {@code current + next}.
*/