Fix checkstyle errors

This commit is contained in:
Norman Maurer 2020-09-07 10:29:41 +02:00
parent 8c465e2f1b
commit ddb503f76d
11 changed files with 120 additions and 94 deletions

View File

@ -34,7 +34,13 @@ import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete; import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.unix.*; import io.netty.channel.unix.Buffer;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.NativeInetAddress;
import io.netty.channel.unix.Socket;
import io.netty.channel.unix.UnixChannel;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -320,7 +326,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
} }
protected final void doWriteSingle(ByteBuf buf) { protected final void doWriteSingle(ByteBuf buf) {
IOUringSubmissionQueue submissionQueue = submissionQueue(); IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(), submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
@ -520,7 +525,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally { } finally {
if (!connectStillInProgress) { if (!connectStillInProgress) {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0
// is used
// See https://github.com/netty/netty/issues/1770 // See https://github.com/netty/netty/issues/1770
cancelConnectTimeoutFuture(); cancelConnectTimeoutFuture();
connectPromise = null; connectPromise = null;
@ -573,7 +579,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} catch (Throwable cause) { } catch (Throwable cause) {
fulfillConnectPromise(connectPromise, cause); fulfillConnectPromise(connectPromise, cause);
} finally { } finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is
// used
// See https://github.com/netty/netty/issues/1770 // See https://github.com/netty/netty/issues/1770
cancelConnectTimeoutFuture(); cancelConnectTimeoutFuture();
connectPromise = null; connectPromise = null;
@ -601,7 +608,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN); remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN);
long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory); long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(), remoteAddressMemoryAddress); socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(),
remoteAddressMemoryAddress);
final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue(); final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue();
ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN); ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN);
} catch (Throwable t) { } catch (Throwable t) {
@ -629,7 +637,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
promise.addListener(new ChannelFutureListener() { promise.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) {
if (future.isCancelled()) { if (future.isCancelled()) {
cancelConnectTimeoutFuture(); cancelConnectTimeoutFuture();
connectPromise = null; connectPromise = null;
@ -688,7 +696,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
return remote; return remote;
} }
public Socket getSocket() { protected Socket getSocket() {
return socket; return socket;
} }

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel.uring; package io.netty.channel.uring;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
final class IOUring { final class IOUring {
@ -28,19 +29,24 @@ final class IOUring {
cause = new UnsupportedOperationException( cause = new UnsupportedOperationException(
"Native transport was explicit disabled with -Dio.netty.transport.noNative=true"); "Native transport was explicit disabled with -Dio.netty.transport.noNative=true");
} else { } else {
RingBuffer ringBuffer = null; Throwable unsafeCause = PlatformDependent.getUnsafeUnavailabilityCause();
try { if (unsafeCause == null) {
ringBuffer = Native.createRingBuffer(); RingBuffer ringBuffer = null;
} catch (Throwable t) { try {
cause = t; ringBuffer = Native.createRingBuffer();
} finally { } catch (Throwable t) {
if (ringBuffer != null) { cause = t;
try { } finally {
ringBuffer.close(); if (ringBuffer != null) {
} catch (Exception ignore) { try {
// ignore ringBuffer.close();
} catch (Exception ignore) {
// ignore
}
} }
} }
} else {
cause = new UnsupportedOperationException("Unsafe is not supported", unsafeCause);
} }
} }

View File

@ -16,16 +16,18 @@
package io.netty.channel.uring; package io.netty.channel.uring;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.ServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
public final class IOUringServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { public final class IOUringServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig {
@ -38,21 +40,21 @@ public final class IOUringServerSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public Map<ChannelOption<?>, Object> getOptions() { public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, return getOptions(super.getOptions(), ChannelOption.SO_RCVBUF, ChannelOption.SO_REUSEADDR,
IOUringChannelOption.SO_REUSEPORT, IOUringChannelOption.IP_FREEBIND, ChannelOption.SO_BACKLOG, IOUringChannelOption.SO_REUSEPORT, IOUringChannelOption.IP_FREEBIND,
IOUringChannelOption.IP_TRANSPARENT, IOUringChannelOption.TCP_DEFER_ACCEPT); IOUringChannelOption.IP_TRANSPARENT, IOUringChannelOption.TCP_DEFER_ACCEPT);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <T> T getOption(ChannelOption<T> option) { public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) { if (option == ChannelOption.SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize()); return (T) Integer.valueOf(getReceiveBufferSize());
} }
if (option == SO_REUSEADDR) { if (option == ChannelOption.SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress()); return (T) Boolean.valueOf(isReuseAddress());
} }
if (option == SO_BACKLOG) { if (option == ChannelOption.SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog()); return (T) Integer.valueOf(getBacklog());
} }
if (option == IOUringChannelOption.SO_REUSEPORT) { if (option == IOUringChannelOption.SO_REUSEPORT) {
@ -73,11 +75,11 @@ public final class IOUringServerSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public <T> boolean setOption(ChannelOption<T> option, T value) { public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value); validate(option, value);
if (option == SO_RCVBUF) { if (option == ChannelOption.SO_RCVBUF) {
setReceiveBufferSize((Integer) value); setReceiveBufferSize((Integer) value);
} else if (option == SO_REUSEADDR) { } else if (option == ChannelOption.SO_REUSEADDR) {
setReuseAddress((Boolean) value); setReuseAddress((Boolean) value);
} else if (option == SO_BACKLOG) { } else if (option == ChannelOption.SO_BACKLOG) {
setBacklog((Integer) value); setBacklog((Integer) value);
} else if (option == IOUringChannelOption.SO_REUSEPORT) { } else if (option == IOUringChannelOption.SO_REUSEPORT) {
setReusePort((Boolean) value); setReusePort((Boolean) value);
@ -225,7 +227,8 @@ public final class IOUringServerSocketChannelConfig extends DefaultChannelConfig
/** /**
* Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple * Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple
* {@link io.netty.channel.socket.ServerSocketChannel}s to the same port and so accept connections with multiple threads. * {@link io.netty.channel.socket.ServerSocketChannel}s to the same port and so accept connections with multiple
* threads.
* *
* Be aware this method needs be called before * Be aware this method needs be called before
* {@link io.netty.channel.socket.ServerSocketChannel#bind(java.net.SocketAddress)} to have any affect. * {@link io.netty.channel.socket.ServerSocketChannel#bind(java.net.SocketAddress)} to have any affect.

View File

@ -357,7 +357,7 @@ final class IOUringSubmissionQueue {
} }
public long count() { public long count() {
return (sqeTail - toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress))); return sqeTail - toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));
} }
//delete memory //delete memory

View File

@ -1,3 +1,18 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring; package io.netty.channel.uring;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -34,7 +49,7 @@ final class IovecArrayPool implements MessageProcessor {
private final ByteBuffer iovecArrayMemory; private final ByteBuffer iovecArrayMemory;
private final long iovecArrayMemoryAddress; private final long iovecArrayMemoryAddress;
public IovecArrayPool() { IovecArrayPool() {
//setup array //setup array
remainingIovec = new Stack<Long>(); remainingIovec = new Stack<Long>();
@ -47,8 +62,7 @@ final class IovecArrayPool implements MessageProcessor {
} }
//Todo better naming //Todo better naming
public long createNewIovecMemoryAddress() { long createNewIovecMemoryAddress() {
//clear //clear
size = 0; size = 0;
count = 0; count = 0;
@ -65,13 +79,12 @@ final class IovecArrayPool implements MessageProcessor {
} }
//Todo error handling //Todo error handling
public void releaseIovec(long iovecAddress) { void releaseIovec(long iovecAddress) {
long index = (iovecAddress - iovecArrayMemoryAddress) / IOVEC_ARRAY_SIZE; long index = (iovecAddress - iovecArrayMemoryAddress) / IOVEC_ARRAY_SIZE;
remainingIovec.push(index); remainingIovec.push(index);
} }
private boolean add(ByteBuf buf, int offset, int len) { private boolean add(ByteBuf buf, int offset, int len) {
if (count == IOV_ENTRIES) { if (count == IOV_ENTRIES) {
// No more room! // No more room!
@ -138,7 +151,7 @@ final class IovecArrayPool implements MessageProcessor {
return false; return false;
} }
public int count() { int count() {
return count; return count;
} }
@ -146,7 +159,7 @@ final class IovecArrayPool implements MessageProcessor {
return IOV_SIZE * index; return IOV_SIZE * index;
} }
public void release() { void release() {
Buffer.free(iovecArrayMemory); Buffer.free(iovecArrayMemory);
} }
} }

View File

@ -387,5 +387,6 @@ final class LinuxSocket extends Socket {
private static native int getIpMulticastLoop(int fd, boolean ipv6) throws IOException; private static native int getIpMulticastLoop(int fd, boolean ipv6) throws IOException;
private static native void setIpMulticastLoop(int fd, boolean ipv6, int enabled) throws IOException; private static native void setIpMulticastLoop(int fd, boolean ipv6, int enabled) throws IOException;
private static native void setTimeToLive(int fd, int ttl) throws IOException; private static native void setTimeToLive(int fd, int ttl) throws IOException;
private static native int initAddress(int fd, boolean ipv6, byte[] address, int scopeId, int port, long memoryAddress); private static native int initAddress(
int fd, boolean ipv6, byte[] address, int scopeId, int port, long memoryAddress);
} }

View File

@ -79,13 +79,10 @@ public class IOUringEventLoopTest extends AbstractSingleThreadEventLoopTest {
EventLoop loop = group.next(); EventLoop loop = group.next();
loop.schedule(new Runnable() { loop.schedule(new Runnable() {
@Override @Override
public void run() { public void run() { }
}
}, 1, TimeUnit.SECONDS).sync(); }, 1, TimeUnit.SECONDS).sync();
} finally { } finally {
group.shutdownGracefully(); group.shutdownGracefully();
} }
} }
} }

View File

@ -27,4 +27,4 @@ public class IOUringSocketDataReadInitialStateTest extends SocketDataReadInitial
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() { protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return IOUringSocketTestPermutation.INSTANCE.socket(); return IOUringSocketTestPermutation.INSTANCE.socket();
} }
} }

View File

@ -27,4 +27,4 @@ public class IOUringSocketExceptionHandlingTest extends SocketExceptionHandlingT
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() { protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return IOUringSocketTestPermutation.INSTANCE.socket(); return IOUringSocketTestPermutation.INSTANCE.socket();
} }
} }

View File

@ -1,3 +1,18 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring; package io.netty.channel.uring;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -17,17 +32,21 @@ public class IOUringSubmissionQueueTest {
@Test @Test
public void sqeFullTest() { public void sqeFullTest() {
RingBuffer ringBuffer = Native.createRingBuffer(8); RingBuffer ringBuffer = Native.createRingBuffer(8);
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); try {
final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue(); IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue();
assertNotNull(ringBuffer); assertNotNull(ringBuffer);
assertNotNull(submissionQueue); assertNotNull(submissionQueue);
assertNotNull(completionQueue); assertNotNull(completionQueue);
int counter = 0; int counter = 0;
while(!submissionQueue.addAccept(-1)) { while (!submissionQueue.addAccept(-1)) {
counter++; counter++;
}
assertEquals(8, counter);
} finally {
ringBuffer.close();
} }
assertEquals(8, counter);
} }
} }

View File

@ -1,12 +1,23 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring; package io.netty.channel.uring;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
@ -24,29 +35,7 @@ public class PollRemoveTest {
assumeTrue(IOUring.isAvailable()); assumeTrue(IOUring.isAvailable());
} }
@Sharable private void io_uring_test() throws Exception {
private static final class EchoUringServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
void io_uring_test() throws Exception {
Class<? extends ServerSocketChannel> clazz = IOUringServerSocketChannel.class; Class<? extends ServerSocketChannel> clazz = IOUringServerSocketChannel.class;
final EventLoopGroup bossGroup = new IOUringEventLoopGroup(1); final EventLoopGroup bossGroup = new IOUringEventLoopGroup(1);
final EventLoopGroup workerGroup = new IOUringEventLoopGroup(1); final EventLoopGroup workerGroup = new IOUringEventLoopGroup(1);
@ -58,11 +47,7 @@ public class PollRemoveTest {
.handler(new LoggingHandler(LogLevel.TRACE)) .handler(new LoggingHandler(LogLevel.TRACE))
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) { }
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoUringServerHandler());
}
}); });
Channel sc = b.bind(2020).sync().channel(); Channel sc = b.bind(2020).sync().channel();
@ -74,18 +59,12 @@ public class PollRemoveTest {
bossGroup.shutdownGracefully(); bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
} }
} }
@Test @Test
public void test() throws Exception { public void test() throws Exception {
io_uring_test(); io_uring_test();
System.out.println("io_uring --------------------------------");
io_uring_test(); io_uring_test();
} }
} }