From e7b319557033d889fe5a4c0cbaac34b15f9799ba Mon Sep 17 00:00:00 2001 From: Lunfu Zhong Date: Tue, 19 Mar 2019 18:24:07 +0800 Subject: [PATCH] Support ALLOW_HALF_CLOSURE channel option on Unix domain socket. (#8932) Motivation: Since DomainSocketChannel is a DuplexChannel, which be able to shutdown input or output individually on demands, but ALLOW_HALF_CLOSURE channel option has not been supported yet. I thought this could be a missing feature of Unix domain socket, so here the PR for it. Modifications: 1. Added allHalfClosure property both in EpollDomainSocketChannelConfig and KQueueDomainSocketChannelConfig, 2. Enabled isAllowHalfClosure method of native channel to support domain channel config, 3. Created EpollDomainSocketShutdownOutputByPeerTest and KQueueDomainSocketShutdownOutputByPeerTest to verify the change. Result: ALLOW_HALF_CLOSURE channel option can be set with DomainSocketChannel, and no more warning of Unknown channel option 'ALLOW_HALF_CLOSURE'. --- ...bstractSocketShutdownOutputByPeerTest.java | 163 ++++++++++++++++++ .../SocketShutdownOutputByPeerTest.java | 139 ++------------- .../channel/epoll/AbstractEpollChannel.java | 3 + .../epoll/EpollDomainSocketChannelConfig.java | 31 +++- ...lDomainSocketShutdownOutputByPeerTest.java | 69 ++++++++ .../channel/kqueue/AbstractKQueueChannel.java | 8 +- .../KQueueDomainSocketChannelConfig.java | 27 ++- ...eDomainSocketShutdownOutputByPeerTest.java | 68 ++++++++ 8 files changed, 381 insertions(+), 127 deletions(-) create mode 100644 testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractSocketShutdownOutputByPeerTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketShutdownOutputByPeerTest.java create mode 100644 transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketShutdownOutputByPeerTest.java diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractSocketShutdownOutputByPeerTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractSocketShutdownOutputByPeerTest.java new file mode 100644 index 0000000000..6e895a481f --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractSocketShutdownOutputByPeerTest.java @@ -0,0 +1,163 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOption; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.ChannelInputShutdownEvent; +import io.netty.channel.socket.DuplexChannel; +import org.junit.Test; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +public abstract class AbstractSocketShutdownOutputByPeerTest extends AbstractServerSocketTest { + + @Test(timeout = 30000) + public void testShutdownOutput() throws Throwable { + run(); + } + + public void testShutdownOutput(ServerBootstrap sb) throws Throwable { + TestHandler h = new TestHandler(); + Socket s = newSocket(); + Channel sc = null; + try { + sc = sb.childHandler(h).childOption(ChannelOption.ALLOW_HALF_CLOSURE, true).bind().sync().channel(); + + connect(s, sc.localAddress()); + write(s, 1); + + assertEquals(1, (int) h.queue.take()); + + assertTrue(h.ch.isOpen()); + assertTrue(h.ch.isActive()); + assertFalse(h.ch.isInputShutdown()); + assertFalse(h.ch.isOutputShutdown()); + + shutdownOutput(s); + + h.halfClosure.await(); + + assertTrue(h.ch.isOpen()); + assertTrue(h.ch.isActive()); + assertTrue(h.ch.isInputShutdown()); + assertFalse(h.ch.isOutputShutdown()); + + while (h.closure.getCount() != 1 && h.halfClosureCount.intValue() != 1) { + Thread.sleep(100); + } + } finally { + if (sc != null) { + sc.close(); + } + close(s); + } + } + + @Test(timeout = 30000) + public void testShutdownOutputWithoutOption() throws Throwable { + run(); + } + + public void testShutdownOutputWithoutOption(ServerBootstrap sb) throws Throwable { + TestHandler h = new TestHandler(); + Socket s = newSocket(); + Channel sc = null; + try { + sc = sb.childHandler(h).bind().sync().channel(); + + connect(s, sc.localAddress()); + write(s, 1); + + assertEquals(1, (int) h.queue.take()); + + assertTrue(h.ch.isOpen()); + assertTrue(h.ch.isActive()); + assertFalse(h.ch.isInputShutdown()); + assertFalse(h.ch.isOutputShutdown()); + + shutdownOutput(s); + + h.closure.await(); + + assertFalse(h.ch.isOpen()); + assertFalse(h.ch.isActive()); + assertTrue(h.ch.isInputShutdown()); + assertTrue(h.ch.isOutputShutdown()); + + while (h.halfClosure.getCount() != 1 && h.halfClosureCount.intValue() != 0) { + Thread.sleep(100); + } + } finally { + if (sc != null) { + sc.close(); + } + close(s); + } + } + + protected abstract void shutdownOutput(Socket s) throws IOException; + + protected abstract void connect(Socket s, SocketAddress address) throws IOException; + + protected abstract void close(Socket s) throws IOException; + + protected abstract void write(Socket s, int data) throws IOException; + + protected abstract Socket newSocket(); + + private static class TestHandler extends SimpleChannelInboundHandler { + volatile DuplexChannel ch; + final BlockingQueue queue = new LinkedBlockingQueue(); + final CountDownLatch halfClosure = new CountDownLatch(1); + final CountDownLatch closure = new CountDownLatch(1); + final AtomicInteger halfClosureCount = new AtomicInteger(); + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ch = (DuplexChannel) ctx.channel(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + closure.countDown(); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + queue.offer(msg.readByte()); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof ChannelInputShutdownEvent) { + halfClosureCount.incrementAndGet(); + halfClosure.countDown(); + } + } + } +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java index f18109e039..d42a3c09a0 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketShutdownOutputByPeerTest.java @@ -15,138 +15,37 @@ */ package io.netty.testsuite.transport.socket; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOption; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.internal.SocketUtils; -import io.netty.channel.socket.ChannelInputShutdownEvent; -import io.netty.channel.socket.SocketChannel; -import org.junit.Test; +import java.io.IOException; import java.net.Socket; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; +import java.net.SocketAddress; -import static org.junit.Assert.*; +public class SocketShutdownOutputByPeerTest extends AbstractSocketShutdownOutputByPeerTest { -public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest { - - @Test(timeout = 30000) - public void testShutdownOutput() throws Throwable { - run(); + @Override + protected void shutdownOutput(Socket s) throws IOException { + s.shutdownOutput(); } - public void testShutdownOutput(ServerBootstrap sb) throws Throwable { - TestHandler h = new TestHandler(); - Socket s = new Socket(); - Channel sc = null; - try { - sc = sb.childHandler(h).childOption(ChannelOption.ALLOW_HALF_CLOSURE, true).bind().sync().channel(); - - SocketUtils.connect(s, sc.localAddress(), 10000); - s.getOutputStream().write(1); - - assertEquals(1, (int) h.queue.take()); - - assertTrue(h.ch.isOpen()); - assertTrue(h.ch.isActive()); - assertFalse(h.ch.isInputShutdown()); - assertFalse(h.ch.isOutputShutdown()); - - s.shutdownOutput(); - - h.halfClosure.await(); - - assertTrue(h.ch.isOpen()); - assertTrue(h.ch.isActive()); - assertTrue(h.ch.isInputShutdown()); - assertFalse(h.ch.isOutputShutdown()); - assertEquals(1, h.closure.getCount()); - Thread.sleep(100); - assertEquals(1, h.halfClosureCount.intValue()); - } finally { - if (sc != null) { - sc.close(); - } - s.close(); - } + @Override + protected void connect(Socket s, SocketAddress address) throws IOException { + SocketUtils.connect(s, address, 10000); } - @Test(timeout = 30000) - public void testShutdownOutputWithoutOption() throws Throwable { - run(); + @Override + protected void close(Socket s) throws IOException { + s.close(); } - public void testShutdownOutputWithoutOption(ServerBootstrap sb) throws Throwable { - TestHandler h = new TestHandler(); - Socket s = new Socket(); - Channel sc = null; - try { - sc = sb.childHandler(h).bind().sync().channel(); - - SocketUtils.connect(s, sc.localAddress(), 10000); - s.getOutputStream().write(1); - - assertEquals(1, (int) h.queue.take()); - - assertTrue(h.ch.isOpen()); - assertTrue(h.ch.isActive()); - assertFalse(h.ch.isInputShutdown()); - assertFalse(h.ch.isOutputShutdown()); - - s.shutdownOutput(); - - h.closure.await(); - - assertFalse(h.ch.isOpen()); - assertFalse(h.ch.isActive()); - assertTrue(h.ch.isInputShutdown()); - assertTrue(h.ch.isOutputShutdown()); - - assertEquals(1, h.halfClosure.getCount()); - Thread.sleep(100); - assertEquals(0, h.halfClosureCount.intValue()); - } finally { - if (sc != null) { - sc.close(); - } - s.close(); - } + @Override + protected void write(Socket s, int data) throws IOException { + s.getOutputStream().write(data); } - private static class TestHandler extends SimpleChannelInboundHandler { - volatile SocketChannel ch; - final BlockingQueue queue = new LinkedBlockingQueue(); - final CountDownLatch halfClosure = new CountDownLatch(1); - final CountDownLatch closure = new CountDownLatch(1); - final AtomicInteger halfClosureCount = new AtomicInteger(); - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - ch = (SocketChannel) ctx.channel(); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - closure.countDown(); - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - queue.offer(msg.readByte()); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof ChannelInputShutdownEvent) { - halfClosureCount.incrementAndGet(); - halfClosure.countDown(); - } - } + @Override + protected Socket newSocket() { + return new Socket(); } + } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 41c7d2cda3..377c515ab9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -237,6 +237,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } private static boolean isAllowHalfClosure(ChannelConfig config) { + if (config instanceof EpollDomainSocketChannelConfig) { + return ((EpollDomainSocketChannelConfig) config).isAllowHalfClosure(); + } return config instanceof SocketChannelConfig && ((SocketChannelConfig) config).isAllowHalfClosure(); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java index ea6c5326b5..70b08be1e0 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java @@ -20,14 +20,19 @@ import io.netty.channel.ChannelOption; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.unix.DomainSocketChannelConfig; import io.netty.channel.unix.DomainSocketReadMode; import java.util.Map; +import static io.netty.channel.ChannelOption.*; +import static io.netty.channel.unix.UnixChannelOption.*; + public final class EpollDomainSocketChannelConfig extends EpollChannelConfig implements DomainSocketChannelConfig { private volatile DomainSocketReadMode mode = DomainSocketReadMode.BYTES; + private volatile boolean allowHalfClosure; EpollDomainSocketChannelConfig(AbstractEpollChannel channel) { super(channel); @@ -35,15 +40,18 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig @Override public Map, Object> getOptions() { - return getOptions(super.getOptions(), EpollChannelOption.DOMAIN_SOCKET_READ_MODE); + return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE); } @SuppressWarnings("unchecked") @Override public T getOption(ChannelOption option) { - if (option == EpollChannelOption.DOMAIN_SOCKET_READ_MODE) { + if (option == DOMAIN_SOCKET_READ_MODE) { return (T) getReadMode(); } + if (option == ALLOW_HALF_CLOSURE) { + return (T) Boolean.valueOf(isAllowHalfClosure()); + } return super.getOption(option); } @@ -51,8 +59,10 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig public boolean setOption(ChannelOption option, T value) { validate(option, value); - if (option == EpollChannelOption.DOMAIN_SOCKET_READ_MODE) { + if (option == DOMAIN_SOCKET_READ_MODE) { setReadMode((DomainSocketReadMode) value); + } else if (option == ALLOW_HALF_CLOSURE) { + setAllowHalfClosure((Boolean) value); } else { return super.setOption(option, value); } @@ -148,4 +158,19 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig public DomainSocketReadMode getReadMode() { return mode; } + + /** + * @see SocketChannelConfig#isAllowHalfClosure() + */ + public boolean isAllowHalfClosure() { + return allowHalfClosure; + } + + /** + * @see SocketChannelConfig#setAllowHalfClosure(boolean) + */ + public EpollDomainSocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) { + this.allowHalfClosure = allowHalfClosure; + return this; + } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketShutdownOutputByPeerTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketShutdownOutputByPeerTest.java new file mode 100644 index 0000000000..7d0d483f17 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketShutdownOutputByPeerTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.epoll; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.unix.Buffer; +import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; +import io.netty.testsuite.transport.socket.AbstractSocketShutdownOutputByPeerTest; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.List; + +public class EpollDomainSocketShutdownOutputByPeerTest extends AbstractSocketShutdownOutputByPeerTest { + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.serverDomainSocket(); + } + + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected void shutdownOutput(LinuxSocket s) throws IOException { + s.shutdown(false, true); + } + + @Override + protected void connect(LinuxSocket s, SocketAddress address) throws IOException { + s.connect(address); + } + + @Override + protected void close(LinuxSocket s) throws IOException { + s.close(); + } + + @Override + protected void write(LinuxSocket s, int data) throws IOException { + final ByteBuffer buf = Buffer.allocateDirectWithNativeOrder(4); + buf.putInt(data); + buf.flip(); + s.write(buf, buf.position(), buf.limit()); + Buffer.free(buf); + } + + @Override + protected LinuxSocket newSocket() { + return LinuxSocket.newSocketDomain(); + } +} diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index f44a7f164f..d7546527d4 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -269,7 +269,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan return 1; } } else { - final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ? + final ByteBuffer nioBuf = buf.nioBufferCount() == 1? buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer(); int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit()); if (localFlushedAmount > 0) { @@ -286,6 +286,10 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan } private static boolean isAllowHalfClosure(ChannelConfig config) { + if (config instanceof KQueueDomainSocketChannelConfig) { + return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure(); + } + return config instanceof SocketChannelConfig && ((SocketChannelConfig) config).isAllowHalfClosure(); } @@ -682,7 +686,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan boolean connected = doConnect0(remoteAddress); if (connected) { - remote = remoteSocketAddr == null ? + remote = remoteSocketAddr == null? remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress()); } // We always need to set the localAddress even if not connected yet as the bind already took place. diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannelConfig.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannelConfig.java index eefd4c0ae1..5284a4b665 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannelConfig.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannelConfig.java @@ -20,17 +20,20 @@ import io.netty.channel.ChannelOption; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.unix.DomainSocketChannelConfig; import io.netty.channel.unix.DomainSocketReadMode; import io.netty.util.internal.UnstableApi; import java.util.Map; -import static io.netty.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE; +import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE; +import static io.netty.channel.unix.UnixChannelOption.*; @UnstableApi public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig implements DomainSocketChannelConfig { private volatile DomainSocketReadMode mode = DomainSocketReadMode.BYTES; + private volatile boolean allowHalfClosure; KQueueDomainSocketChannelConfig(AbstractKQueueChannel channel) { super(channel); @@ -38,7 +41,7 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i @Override public Map, Object> getOptions() { - return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE); + return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE); } @SuppressWarnings("unchecked") @@ -47,6 +50,9 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i if (option == DOMAIN_SOCKET_READ_MODE) { return (T) getReadMode(); } + if (option == ALLOW_HALF_CLOSURE) { + return (T) Boolean.valueOf(isAllowHalfClosure()); + } return super.getOption(option); } @@ -56,6 +62,8 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i if (option == DOMAIN_SOCKET_READ_MODE) { setReadMode((DomainSocketReadMode) value); + } else if (option == ALLOW_HALF_CLOSURE) { + setAllowHalfClosure((Boolean) value); } else { return super.setOption(option, value); } @@ -151,4 +159,19 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i public DomainSocketReadMode getReadMode() { return mode; } + + /** + * @see SocketChannelConfig#isAllowHalfClosure() + */ + public boolean isAllowHalfClosure() { + return allowHalfClosure; + } + + /** + * @see SocketChannelConfig#setAllowHalfClosure(boolean) + */ + public KQueueDomainSocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) { + this.allowHalfClosure = allowHalfClosure; + return this; + } } diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketShutdownOutputByPeerTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketShutdownOutputByPeerTest.java new file mode 100644 index 0000000000..76532ac1a8 --- /dev/null +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketShutdownOutputByPeerTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.kqueue; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.unix.Buffer; +import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; +import io.netty.testsuite.transport.socket.AbstractSocketShutdownOutputByPeerTest; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.List; + +public class KQueueDomainSocketShutdownOutputByPeerTest extends AbstractSocketShutdownOutputByPeerTest { + + @Override + protected List> newFactories() { + return KQueueSocketTestPermutation.INSTANCE.serverDomainSocket(); + } + + @Override + protected SocketAddress newSocketAddress() { + return KQueueSocketTestPermutation.newSocketAddress(); + } + + @Override + protected void shutdownOutput(BsdSocket s) throws IOException { + s.shutdown(false, true); + } + + @Override + protected void connect(BsdSocket s, SocketAddress address) throws IOException { + s.connect(address); + } + + @Override + protected void close(BsdSocket s) throws IOException { + s.close(); + } + + @Override + protected void write(BsdSocket s, int data) throws IOException { + final ByteBuffer buf = Buffer.allocateDirectWithNativeOrder(4); + buf.putInt(data); + buf.flip(); + s.write(buf, buf.position(), buf.limit()); + Buffer.free(buf); + } + + @Override + protected BsdSocket newSocket() { + return BsdSocket.newSocketDomain(); + } +}