diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 8c807828e0..100b6761c8 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -29,6 +29,7 @@ import io.netty.channel.ConnectTimeoutException; import io.netty.channel.DefaultFileRegion; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.DuplexChannel; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; import io.netty.util.internal.EmptyArrays; @@ -44,13 +45,14 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.Queue; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static io.netty.channel.unix.FileDescriptor.pipe; import static io.netty.util.internal.ObjectUtil.checkNotNull; -public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { +public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel { private static final String EXPECTED_TYPES = " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + @@ -537,6 +539,47 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } } + @Override + public boolean isInputShutdown() { + return fd().isInputShutdown(); + } + + @Override + public boolean isOutputShutdown() { + return fd().isOutputShutdown(); + } + + @Override + public ChannelFuture shutdownOutput() { + return shutdownOutput(newPromise()); + } + + @Override + public ChannelFuture shutdownOutput(final ChannelPromise promise) { + Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose(); + if (closeExecutor != null) { + closeExecutor.execute(new OneTimeTask() { + @Override + public void run() { + shutdownOutput0(promise); + } + }); + } else { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdownOutput0(promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + shutdownOutput0(promise); + } + }); + } + } + return promise; + } + @Override protected void doClose() throws Exception { try { @@ -613,6 +656,12 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { private RecvByteBufAllocator.Handle allocHandle; + // Overridden here just to be able to access this method from AbstractEpollStreamChannel + @Override + protected Executor prepareToClose() { + return super.prepareToClose(); + } + private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { if (byteBuf != null) { if (byteBuf.isReadable()) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 4fe6b8a9f4..d0c90ea231 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -16,15 +16,11 @@ package io.netty.channel.epoll; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import java.net.InetAddress; @@ -143,47 +139,6 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme return config; } - @Override - public boolean isInputShutdown() { - return fd().isInputShutdown(); - } - - @Override - public boolean isOutputShutdown() { - return fd().isOutputShutdown(); - } - - @Override - public ChannelFuture shutdownOutput() { - return shutdownOutput(newPromise()); - } - - @Override - public ChannelFuture shutdownOutput(final ChannelPromise promise) { - Executor closeExecutor = ((EpollSocketChannelUnsafe) unsafe()).prepareToClose(); - if (closeExecutor != null) { - closeExecutor.execute(new OneTimeTask() { - @Override - public void run() { - shutdownOutput0(promise); - } - }); - } else { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - shutdownOutput0(promise); - } else { - loop.execute(new OneTimeTask() { - @Override - public void run() { - shutdownOutput0(promise); - } - }); - } - } - return promise; - } - @Override public ServerSocketChannel parent() { return (ServerSocketChannel) super.parent(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketChannel.java index f8163d9134..9a6c08b584 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/unix/DomainSocketChannel.java @@ -15,11 +15,13 @@ */ package io.netty.channel.unix; +import io.netty.channel.socket.DuplexChannel; + /** * A {@link UnixChannel} that supports communication via * Unix Domain Socket. */ -public interface DomainSocketChannel extends UnixChannel { +public interface DomainSocketChannel extends UnixChannel, DuplexChannel { @Override DomainSocketAddress remoteAddress(); diff --git a/transport/src/main/java/io/netty/channel/socket/DuplexChannel.java b/transport/src/main/java/io/netty/channel/socket/DuplexChannel.java new file mode 100644 index 0000000000..d34ec36bff --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/DuplexChannel.java @@ -0,0 +1,51 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; + +import java.net.Socket; + +/** + * A duplex {@link Channel} that has two sides that can be shutdown independently. + */ +public interface DuplexChannel extends Channel { + /** + * Returns {@code true} if and only if the remote peer shut down its output so that no more + * data is received from this channel. Note that the semantic of this method is different from + * that of {@link Socket#shutdownInput()} and {@link Socket#isInputShutdown()}. + */ + boolean isInputShutdown(); + + /** + * @see Socket#isOutputShutdown() + */ + boolean isOutputShutdown(); + + /** + * @see Socket#shutdownOutput() + */ + ChannelFuture shutdownOutput(); + + /** + * @see Socket#shutdownOutput() + * + * Will notify the given {@link ChannelPromise} + */ + ChannelFuture shutdownOutput(ChannelPromise promise); +} diff --git a/transport/src/main/java/io/netty/channel/socket/SocketChannel.java b/transport/src/main/java/io/netty/channel/socket/SocketChannel.java index ba0962c9fd..22562c7bab 100644 --- a/transport/src/main/java/io/netty/channel/socket/SocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/SocketChannel.java @@ -16,16 +16,13 @@ package io.netty.channel.socket; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPromise; import java.net.InetSocketAddress; -import java.net.Socket; /** * A TCP/IP socket {@link Channel}. */ -public interface SocketChannel extends Channel { +public interface SocketChannel extends DuplexChannel { @Override ServerSocketChannel parent(); @@ -35,28 +32,4 @@ public interface SocketChannel extends Channel { InetSocketAddress localAddress(); @Override InetSocketAddress remoteAddress(); - - /** - * Returns {@code true} if and only if the remote peer shut down its output so that no more - * data is received from this channel. Note that the semantic of this method is different from - * that of {@link Socket#shutdownInput()} and {@link Socket#isInputShutdown()}. - */ - boolean isInputShutdown(); - - /** - * @see Socket#isOutputShutdown() - */ - boolean isOutputShutdown(); - - /** - * @see Socket#shutdownOutput() - */ - ChannelFuture shutdownOutput(); - - /** - * @see Socket#shutdownOutput() - * - * Will notify the given {@link ChannelPromise} - */ - ChannelFuture shutdownOutput(ChannelPromise future); }