Enable shutdownOutput for EpollDomainSocketChannel

Motivation:

See #4882
Modification:

isInputShutdown, isOutputShutdown and shutdownOutput methods moved from io.netty.channel.socket.SocketChannel to a new interface io.netty.channel.socket.DuplexChannel.
io.netty.channel.unix.DomainSocketChannel now extends DuplexChannel
Methods implementing DuplexChannel moved from EpollSocketChannel to AbstractEpollStreamChannel.
Result:

Possible to call shutdownOutput on EpollDomainSocketChannel
This commit is contained in:
Roman Timushev 2016-02-18 19:41:55 +03:00 committed by Norman Maurer
parent f6bfecc822
commit 02329d10c7
5 changed files with 105 additions and 75 deletions

View File

@ -29,6 +29,7 @@ import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.DefaultFileRegion; import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket; import io.netty.channel.unix.Socket;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
@ -44,13 +45,14 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static io.netty.channel.unix.FileDescriptor.pipe; import static io.netty.channel.unix.FileDescriptor.pipe;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
private static final String EXPECTED_TYPES = private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
@ -537,6 +539,47 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
} }
} }
@Override
public boolean isInputShutdown() {
return fd().isInputShutdown();
}
@Override
public boolean isOutputShutdown() {
return fd().isOutputShutdown();
}
@Override
public ChannelFuture shutdownOutput() {
return shutdownOutput(newPromise());
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
}
return promise;
}
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
try { try {
@ -613,6 +656,12 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
private RecvByteBufAllocator.Handle allocHandle; private RecvByteBufAllocator.Handle allocHandle;
// Overridden here just to be able to access this method from AbstractEpollStreamChannel
@Override
protected Executor prepareToClose() {
return super.prepareToClose();
}
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) { if (byteBuf != null) {
if (byteBuf.isReadable()) { if (byteBuf.isReadable()) {

View File

@ -16,15 +16,11 @@
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket; import io.netty.channel.unix.Socket;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import java.net.InetAddress; import java.net.InetAddress;
@ -143,47 +139,6 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
return config; return config;
} }
@Override
public boolean isInputShutdown() {
return fd().isInputShutdown();
}
@Override
public boolean isOutputShutdown() {
return fd().isOutputShutdown();
}
@Override
public ChannelFuture shutdownOutput() {
return shutdownOutput(newPromise());
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
Executor closeExecutor = ((EpollSocketChannelUnsafe) unsafe()).prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
}
return promise;
}
@Override @Override
public ServerSocketChannel parent() { public ServerSocketChannel parent() {
return (ServerSocketChannel) super.parent(); return (ServerSocketChannel) super.parent();

View File

@ -15,11 +15,13 @@
*/ */
package io.netty.channel.unix; package io.netty.channel.unix;
import io.netty.channel.socket.DuplexChannel;
/** /**
* A {@link UnixChannel} that supports communication via * A {@link UnixChannel} that supports communication via
* <a href="http://en.wikipedia.org/wiki/Unix_domain_socket">Unix Domain Socket</a>. * <a href="http://en.wikipedia.org/wiki/Unix_domain_socket">Unix Domain Socket</a>.
*/ */
public interface DomainSocketChannel extends UnixChannel { public interface DomainSocketChannel extends UnixChannel, DuplexChannel {
@Override @Override
DomainSocketAddress remoteAddress(); DomainSocketAddress remoteAddress();

View File

@ -0,0 +1,51 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import java.net.Socket;
/**
* A duplex {@link Channel} that has two sides that can be shutdown independently.
*/
public interface DuplexChannel extends Channel {
/**
* Returns {@code true} if and only if the remote peer shut down its output so that no more
* data is received from this channel. Note that the semantic of this method is different from
* that of {@link Socket#shutdownInput()} and {@link Socket#isInputShutdown()}.
*/
boolean isInputShutdown();
/**
* @see Socket#isOutputShutdown()
*/
boolean isOutputShutdown();
/**
* @see Socket#shutdownOutput()
*/
ChannelFuture shutdownOutput();
/**
* @see Socket#shutdownOutput()
*
* Will notify the given {@link ChannelPromise}
*/
ChannelFuture shutdownOutput(ChannelPromise promise);
}

View File

@ -16,16 +16,13 @@
package io.netty.channel.socket; package io.netty.channel.socket;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket;
/** /**
* A TCP/IP socket {@link Channel}. * A TCP/IP socket {@link Channel}.
*/ */
public interface SocketChannel extends Channel { public interface SocketChannel extends DuplexChannel {
@Override @Override
ServerSocketChannel parent(); ServerSocketChannel parent();
@ -35,28 +32,4 @@ public interface SocketChannel extends Channel {
InetSocketAddress localAddress(); InetSocketAddress localAddress();
@Override @Override
InetSocketAddress remoteAddress(); InetSocketAddress remoteAddress();
/**
* Returns {@code true} if and only if the remote peer shut down its output so that no more
* data is received from this channel. Note that the semantic of this method is different from
* that of {@link Socket#shutdownInput()} and {@link Socket#isInputShutdown()}.
*/
boolean isInputShutdown();
/**
* @see Socket#isOutputShutdown()
*/
boolean isOutputShutdown();
/**
* @see Socket#shutdownOutput()
*/
ChannelFuture shutdownOutput();
/**
* @see Socket#shutdownOutput()
*
* Will notify the given {@link ChannelPromise}
*/
ChannelFuture shutdownOutput(ChannelPromise future);
} }