Start to refactor oio transport to share more code. See #186
This commit is contained in:
parent
1b099acde0
commit
65be9ebd44
@ -0,0 +1,115 @@
|
||||
/*
|
||||
* Copyright 2011 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.socket.oio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
import io.netty.channel.AbstractChannel;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFactory;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelSink;
|
||||
|
||||
abstract class AbstractOioChannel extends AbstractChannel{
|
||||
private volatile InetSocketAddress localAddress;
|
||||
volatile InetSocketAddress remoteAddress;
|
||||
volatile Thread workerThread;
|
||||
final Object interestOpsLock = new Object();
|
||||
|
||||
AbstractOioChannel(
|
||||
Channel parent,
|
||||
ChannelFactory factory,
|
||||
ChannelPipeline pipeline,
|
||||
ChannelSink sink) {
|
||||
super(parent, factory, pipeline, sink);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean setClosed() {
|
||||
return super.setClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setInterestOpsNow(int interestOps) {
|
||||
super.setInterestOpsNow(interestOps);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
|
||||
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
|
||||
return super.write(message, null);
|
||||
} else {
|
||||
return super.write(message, remoteAddress);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBound() {
|
||||
return isOpen() && isSocketBound();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConnected() {
|
||||
return isOpen() && isSocketConnected();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getLocalAddress() {
|
||||
InetSocketAddress localAddress = this.localAddress;
|
||||
if (localAddress == null) {
|
||||
try {
|
||||
this.localAddress = localAddress =
|
||||
(InetSocketAddress) getLocalSocketAddress();
|
||||
} catch (Throwable t) {
|
||||
// Sometimes fails on a closed socket in Windows.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return localAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getRemoteAddress() {
|
||||
InetSocketAddress remoteAddress = this.remoteAddress;
|
||||
if (remoteAddress == null) {
|
||||
try {
|
||||
this.remoteAddress = remoteAddress =
|
||||
(InetSocketAddress) getRemoteSocketAddress();
|
||||
} catch (Throwable t) {
|
||||
// Sometimes fails on a closed socket in Windows.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return remoteAddress;
|
||||
}
|
||||
|
||||
abstract boolean isSocketBound();
|
||||
|
||||
abstract boolean isSocketConnected();
|
||||
|
||||
abstract boolean isSocketClosed();
|
||||
|
||||
abstract InetSocketAddress getLocalSocketAddress() throws Exception;
|
||||
|
||||
abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
|
||||
|
||||
abstract void closeSocket() throws IOException;
|
||||
|
||||
}
|
@ -0,0 +1,158 @@
|
||||
/*
|
||||
* Copyright 2011 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.socket.oio;
|
||||
|
||||
import static io.netty.channel.Channels.fireChannelClosed;
|
||||
import static io.netty.channel.Channels.fireChannelDisconnected;
|
||||
import static io.netty.channel.Channels.fireChannelInterestChanged;
|
||||
import static io.netty.channel.Channels.fireChannelUnbound;
|
||||
import static io.netty.channel.Channels.fireExceptionCaught;
|
||||
import static io.netty.channel.Channels.succeededFuture;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.Channels;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Abstract base class for Oio-Worker implementations
|
||||
*
|
||||
* @param <C> {@link AbstractOioChannel}
|
||||
*/
|
||||
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Runnable {
|
||||
|
||||
protected final C channel;
|
||||
|
||||
public AbstractOioWorker(C channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
channel.workerThread = Thread.currentThread();
|
||||
|
||||
while (channel.isOpen()) {
|
||||
synchronized (channel.interestOpsLock) {
|
||||
while (!channel.isReadable()) {
|
||||
try {
|
||||
// notify() is not called at all.
|
||||
// close() and setInterestOps() calls Thread.interrupt()
|
||||
channel.interestOpsLock.wait();
|
||||
} catch (InterruptedException e) {
|
||||
if (!channel.isOpen()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
if (!process()) {
|
||||
break;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
if (!channel.isSocketClosed()) {
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Setting the workerThread to null will prevent any channel
|
||||
// operations from interrupting this thread from now on.
|
||||
channel.workerThread = null;
|
||||
|
||||
// Clean up.
|
||||
close(channel, succeededFuture(channel));
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message
|
||||
* was processed without errors.
|
||||
*
|
||||
* @return continue returns <code>true</code> as long as this worker should continue to try processing incoming messages
|
||||
* @throws IOException
|
||||
*/
|
||||
abstract boolean process() throws IOException;
|
||||
|
||||
static void setInterestOps(
|
||||
AbstractOioChannel channel, ChannelFuture future, int interestOps) {
|
||||
|
||||
// Override OP_WRITE flag - a user cannot change this flag.
|
||||
interestOps &= ~Channel.OP_WRITE;
|
||||
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
|
||||
|
||||
boolean changed = false;
|
||||
try {
|
||||
if (channel.getInterestOps() != interestOps) {
|
||||
if ((interestOps & Channel.OP_READ) != 0) {
|
||||
channel.setInterestOpsNow(Channel.OP_READ);
|
||||
} else {
|
||||
channel.setInterestOpsNow(Channel.OP_NONE);
|
||||
}
|
||||
changed = true;
|
||||
}
|
||||
|
||||
future.setSuccess();
|
||||
if (changed) {
|
||||
synchronized (channel.interestOpsLock) {
|
||||
channel.setInterestOpsNow(interestOps);
|
||||
|
||||
// Notify the worker so it stops or continues reading.
|
||||
Thread currentThread = Thread.currentThread();
|
||||
Thread workerThread = channel.workerThread;
|
||||
if (workerThread != null && currentThread != workerThread) {
|
||||
workerThread.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
fireChannelInterestChanged(channel);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
}
|
||||
|
||||
static void close(AbstractOioChannel channel, ChannelFuture future) {
|
||||
boolean connected = channel.isConnected();
|
||||
boolean bound = channel.isBound();
|
||||
try {
|
||||
channel.closeSocket();
|
||||
if (channel.setClosed()) {
|
||||
future.setSuccess();
|
||||
if (connected) {
|
||||
// Notify the worker so it stops reading.
|
||||
Thread currentThread = Thread.currentThread();
|
||||
Thread workerThread = channel.workerThread;
|
||||
if (workerThread != null && currentThread != workerThread) {
|
||||
workerThread.interrupt();
|
||||
}
|
||||
fireChannelDisconnected(channel);
|
||||
}
|
||||
if (bound) {
|
||||
fireChannelUnbound(channel);
|
||||
}
|
||||
fireChannelClosed(channel);
|
||||
} else {
|
||||
future.setSuccess();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
}
|
||||
}
|
@ -22,28 +22,22 @@ import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.MulticastSocket;
|
||||
import java.net.NetworkInterface;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketException;
|
||||
|
||||
import io.netty.channel.AbstractChannel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFactory;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelSink;
|
||||
import io.netty.channel.socket.DatagramChannel;
|
||||
import io.netty.channel.socket.DatagramChannelConfig;
|
||||
import io.netty.channel.socket.DefaultDatagramChannelConfig;
|
||||
|
||||
final class OioDatagramChannel extends AbstractChannel
|
||||
final class OioDatagramChannel extends AbstractOioChannel
|
||||
implements DatagramChannel {
|
||||
|
||||
final MulticastSocket socket;
|
||||
final Object interestOpsLock = new Object();
|
||||
private final DatagramChannelConfig config;
|
||||
volatile Thread workerThread;
|
||||
private volatile InetSocketAddress localAddress;
|
||||
volatile InetSocketAddress remoteAddress;
|
||||
|
||||
|
||||
static OioDatagramChannel create(ChannelFactory factory,
|
||||
ChannelPipeline pipeline, ChannelSink sink) {
|
||||
@ -81,65 +75,6 @@ final class OioDatagramChannel extends AbstractChannel
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getLocalAddress() {
|
||||
InetSocketAddress localAddress = this.localAddress;
|
||||
if (localAddress == null) {
|
||||
try {
|
||||
this.localAddress = localAddress =
|
||||
(InetSocketAddress) socket.getLocalSocketAddress();
|
||||
} catch (Throwable t) {
|
||||
// Sometimes fails on a closed socket in Windows.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return localAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getRemoteAddress() {
|
||||
InetSocketAddress remoteAddress = this.remoteAddress;
|
||||
if (remoteAddress == null) {
|
||||
try {
|
||||
this.remoteAddress = remoteAddress =
|
||||
(InetSocketAddress) socket.getRemoteSocketAddress();
|
||||
} catch (Throwable t) {
|
||||
// Sometimes fails on a closed socket in Windows.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return remoteAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBound() {
|
||||
return isOpen() && socket.isBound();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConnected() {
|
||||
return isOpen() && socket.isConnected();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean setClosed() {
|
||||
return super.setClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setInterestOpsNow(int interestOps) {
|
||||
super.setInterestOpsNow(interestOps);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
|
||||
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
|
||||
return super.write(message, null);
|
||||
} else {
|
||||
return super.write(message, remoteAddress);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void joinGroup(InetAddress multicastAddress) {
|
||||
ensureBound();
|
||||
@ -187,4 +122,36 @@ final class OioDatagramChannel extends AbstractChannel
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isSocketBound() {
|
||||
return socket.isBound();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isSocketConnected() {
|
||||
return socket.isConnected();
|
||||
}
|
||||
|
||||
@Override
|
||||
InetSocketAddress getLocalSocketAddress() throws Exception{
|
||||
return (InetSocketAddress) socket.getLocalSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
InetSocketAddress getRemoteSocketAddress() throws Exception{
|
||||
return (InetSocketAddress) socket.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
void closeSocket() throws IOException {
|
||||
socket.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isSocketClosed() {
|
||||
return socket.isClosed();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -17,77 +17,49 @@ package io.netty.channel.socket.oio;
|
||||
|
||||
import static io.netty.channel.Channels.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.MulticastSocket;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ReceiveBufferSizePredictor;
|
||||
|
||||
class OioDatagramWorker implements Runnable {
|
||||
|
||||
private final OioDatagramChannel channel;
|
||||
class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
|
||||
|
||||
OioDatagramWorker(OioDatagramChannel channel) {
|
||||
this.channel = channel;
|
||||
super(channel);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
channel.workerThread = Thread.currentThread();
|
||||
final MulticastSocket socket = channel.socket;
|
||||
boolean process() throws IOException {
|
||||
|
||||
while (channel.isOpen()) {
|
||||
synchronized (channel.interestOpsLock) {
|
||||
while (!channel.isReadable()) {
|
||||
try {
|
||||
// notify() is not called at all.
|
||||
// close() and setInterestOps() calls Thread.interrupt()
|
||||
channel.interestOpsLock.wait();
|
||||
} catch (InterruptedException e) {
|
||||
if (!channel.isOpen()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ReceiveBufferSizePredictor predictor =
|
||||
channel.getConfig().getReceiveBufferSizePredictor();
|
||||
|
||||
ReceiveBufferSizePredictor predictor =
|
||||
channel.getConfig().getReceiveBufferSizePredictor();
|
||||
|
||||
byte[] buf = new byte[predictor.nextReceiveBufferSize()];
|
||||
DatagramPacket packet = new DatagramPacket(buf, buf.length);
|
||||
try {
|
||||
socket.receive(packet);
|
||||
} catch (InterruptedIOException e) {
|
||||
// Can happen on interruption.
|
||||
// Keep receiving unless the channel is closed.
|
||||
continue;
|
||||
} catch (Throwable t) {
|
||||
if (!channel.socket.isClosed()) {
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
fireMessageReceived(
|
||||
channel,
|
||||
channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
|
||||
packet.getSocketAddress());
|
||||
byte[] buf = new byte[predictor.nextReceiveBufferSize()];
|
||||
DatagramPacket packet = new DatagramPacket(buf, buf.length);
|
||||
try {
|
||||
channel.socket.receive(packet);
|
||||
} catch (InterruptedIOException e) {
|
||||
// Can happen on interruption.
|
||||
// Keep receiving unless the channel is closed.
|
||||
return true;
|
||||
}
|
||||
|
||||
// Setting the workerThread to null will prevent any channel
|
||||
// operations from interrupting this thread from now on.
|
||||
channel.workerThread = null;
|
||||
|
||||
// Clean up.
|
||||
close(channel, succeededFuture(channel));
|
||||
fireMessageReceived(
|
||||
channel,
|
||||
channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
|
||||
packet.getSocketAddress());
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void write(
|
||||
OioDatagramChannel channel, ChannelFuture future,
|
||||
Object message, SocketAddress remoteAddress) {
|
||||
@ -120,44 +92,6 @@ class OioDatagramWorker implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
static void setInterestOps(
|
||||
OioDatagramChannel channel, ChannelFuture future, int interestOps) {
|
||||
|
||||
// Override OP_WRITE flag - a user cannot change this flag.
|
||||
interestOps &= ~Channel.OP_WRITE;
|
||||
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
|
||||
|
||||
boolean changed = false;
|
||||
try {
|
||||
if (channel.getInterestOps() != interestOps) {
|
||||
if ((interestOps & Channel.OP_READ) != 0) {
|
||||
channel.setInterestOpsNow(Channel.OP_READ);
|
||||
} else {
|
||||
channel.setInterestOpsNow(Channel.OP_NONE);
|
||||
}
|
||||
changed = true;
|
||||
}
|
||||
|
||||
future.setSuccess();
|
||||
if (changed) {
|
||||
synchronized (channel.interestOpsLock) {
|
||||
channel.setInterestOpsNow(interestOps);
|
||||
|
||||
// Notify the worker so it stops or continues reading.
|
||||
Thread currentThread = Thread.currentThread();
|
||||
Thread workerThread = channel.workerThread;
|
||||
if (workerThread != null && currentThread != workerThread) {
|
||||
workerThread.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
fireChannelInterestChanged(channel);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
}
|
||||
|
||||
static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
|
||||
boolean connected = channel.isConnected();
|
||||
@ -174,32 +108,4 @@ class OioDatagramWorker implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
static void close(OioDatagramChannel channel, ChannelFuture future) {
|
||||
boolean connected = channel.isConnected();
|
||||
boolean bound = channel.isBound();
|
||||
try {
|
||||
channel.socket.close();
|
||||
if (channel.setClosed()) {
|
||||
future.setSuccess();
|
||||
if (connected) {
|
||||
// Notify the worker so it stops reading.
|
||||
Thread currentThread = Thread.currentThread();
|
||||
Thread workerThread = channel.workerThread;
|
||||
if (workerThread != null && currentThread != workerThread) {
|
||||
workerThread.interrupt();
|
||||
}
|
||||
fireChannelDisconnected(channel);
|
||||
}
|
||||
if (bound) {
|
||||
fireChannelUnbound(channel);
|
||||
}
|
||||
fireChannelClosed(channel);
|
||||
} else {
|
||||
future.setSuccess();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,31 +15,25 @@
|
||||
*/
|
||||
package io.netty.channel.socket.oio;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PushbackInputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
import io.netty.channel.AbstractChannel;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFactory;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelSink;
|
||||
import io.netty.channel.socket.DefaultSocketChannelConfig;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.SocketChannelConfig;
|
||||
|
||||
abstract class OioSocketChannel extends AbstractChannel
|
||||
abstract class OioSocketChannel extends AbstractOioChannel
|
||||
implements SocketChannel {
|
||||
|
||||
final Socket socket;
|
||||
final Object interestOpsLock = new Object();
|
||||
private final SocketChannelConfig config;
|
||||
volatile Thread workerThread;
|
||||
private volatile InetSocketAddress localAddress;
|
||||
private volatile InetSocketAddress remoteAddress;
|
||||
|
||||
OioSocketChannel(
|
||||
Channel parent,
|
||||
@ -59,65 +53,36 @@ abstract class OioSocketChannel extends AbstractChannel
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getLocalAddress() {
|
||||
InetSocketAddress localAddress = this.localAddress;
|
||||
if (localAddress == null) {
|
||||
try {
|
||||
this.localAddress = localAddress =
|
||||
(InetSocketAddress) socket.getLocalSocketAddress();
|
||||
} catch (Throwable t) {
|
||||
// Sometimes fails on a closed socket in Windows.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return localAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getRemoteAddress() {
|
||||
InetSocketAddress remoteAddress = this.remoteAddress;
|
||||
if (remoteAddress == null) {
|
||||
try {
|
||||
this.remoteAddress = remoteAddress =
|
||||
(InetSocketAddress) socket.getRemoteSocketAddress();
|
||||
} catch (Throwable t) {
|
||||
// Sometimes fails on a closed socket in Windows.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return remoteAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBound() {
|
||||
return isOpen() && socket.isBound();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConnected() {
|
||||
return isOpen() && socket.isConnected();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean setClosed() {
|
||||
return super.setClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setInterestOpsNow(int interestOps) {
|
||||
super.setInterestOpsNow(interestOps);
|
||||
}
|
||||
|
||||
abstract PushbackInputStream getInputStream();
|
||||
abstract OutputStream getOutputStream();
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
|
||||
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
|
||||
return super.write(message, null);
|
||||
} else {
|
||||
return getUnsupportedOperationFuture();
|
||||
}
|
||||
boolean isSocketBound() {
|
||||
return socket.isBound();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isSocketConnected() {
|
||||
return socket.isConnected();
|
||||
}
|
||||
|
||||
@Override
|
||||
InetSocketAddress getLocalSocketAddress() throws Exception {
|
||||
return (InetSocketAddress) socket.getLocalSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
InetSocketAddress getRemoteSocketAddress() throws Exception {
|
||||
return (InetSocketAddress) socket.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
void closeSocket() throws IOException {
|
||||
socket.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isSocketClosed() {
|
||||
return socket.isClosed();
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ package io.netty.channel.socket.oio;
|
||||
|
||||
import static io.netty.channel.Channels.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PushbackInputStream;
|
||||
import java.net.SocketException;
|
||||
@ -26,79 +27,38 @@ import java.nio.channels.WritableByteChannel;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import io.netty.buffer.ChannelBuffer;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.FileRegion;
|
||||
|
||||
class OioWorker implements Runnable {
|
||||
class OioWorker extends AbstractOioWorker<OioSocketChannel> {
|
||||
|
||||
private static final Pattern SOCKET_CLOSED_MESSAGE = Pattern.compile(
|
||||
"^.*(?:Socket.*closed).*$", Pattern.CASE_INSENSITIVE);
|
||||
|
||||
private final OioSocketChannel channel;
|
||||
|
||||
OioWorker(OioSocketChannel channel) {
|
||||
this.channel = channel;
|
||||
super(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
channel.workerThread = Thread.currentThread();
|
||||
final PushbackInputStream in = channel.getInputStream();
|
||||
boolean fireOpen = channel instanceof OioAcceptedSocketChannel;
|
||||
|
||||
while (channel.isOpen()) {
|
||||
if (fireOpen) {
|
||||
fireOpen = false;
|
||||
fireChannelConnected(channel, channel.getRemoteAddress());
|
||||
boolean process() throws IOException {
|
||||
byte[] buf;
|
||||
int readBytes;
|
||||
PushbackInputStream in = channel.getInputStream();
|
||||
int bytesToRead = in.available();
|
||||
if (bytesToRead > 0) {
|
||||
buf = new byte[bytesToRead];
|
||||
readBytes = in.read(buf);
|
||||
} else {
|
||||
int b = in.read();
|
||||
if (b < 0) {
|
||||
return false;
|
||||
}
|
||||
synchronized (channel.interestOpsLock) {
|
||||
while (!channel.isReadable()) {
|
||||
try {
|
||||
// notify() is not called at all.
|
||||
// close() and setInterestOps() calls Thread.interrupt()
|
||||
channel.interestOpsLock.wait();
|
||||
} catch (InterruptedException e) {
|
||||
if (!channel.isOpen()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
byte[] buf;
|
||||
int readBytes;
|
||||
try {
|
||||
int bytesToRead = in.available();
|
||||
if (bytesToRead > 0) {
|
||||
buf = new byte[bytesToRead];
|
||||
readBytes = in.read(buf);
|
||||
} else {
|
||||
int b = in.read();
|
||||
if (b < 0) {
|
||||
break;
|
||||
}
|
||||
in.unread(b);
|
||||
continue;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
if (!channel.socket.isClosed()) {
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
fireMessageReceived(
|
||||
channel,
|
||||
channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes));
|
||||
in.unread(b);
|
||||
return true;
|
||||
}
|
||||
fireMessageReceived(channel, channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes));
|
||||
|
||||
// Setting the workerThread to null will prevent any channel
|
||||
// operations from interrupting this thread from now on.
|
||||
channel.workerThread = null;
|
||||
|
||||
// Clean up.
|
||||
close(channel, succeededFuture(channel));
|
||||
return true;
|
||||
}
|
||||
|
||||
static void write(
|
||||
@ -162,71 +122,5 @@ class OioWorker implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
static void setInterestOps(
|
||||
OioSocketChannel channel, ChannelFuture future, int interestOps) {
|
||||
|
||||
// Override OP_WRITE flag - a user cannot change this flag.
|
||||
interestOps &= ~Channel.OP_WRITE;
|
||||
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
|
||||
|
||||
boolean changed = false;
|
||||
try {
|
||||
if (channel.getInterestOps() != interestOps) {
|
||||
if ((interestOps & Channel.OP_READ) != 0) {
|
||||
channel.setInterestOpsNow(Channel.OP_READ);
|
||||
} else {
|
||||
channel.setInterestOpsNow(Channel.OP_NONE);
|
||||
}
|
||||
changed = true;
|
||||
}
|
||||
|
||||
future.setSuccess();
|
||||
if (changed) {
|
||||
synchronized (channel.interestOpsLock) {
|
||||
channel.setInterestOpsNow(interestOps);
|
||||
|
||||
// Notify the worker so it stops or continues reading.
|
||||
Thread currentThread = Thread.currentThread();
|
||||
Thread workerThread = channel.workerThread;
|
||||
if (workerThread != null && currentThread != workerThread) {
|
||||
workerThread.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
fireChannelInterestChanged(channel);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
}
|
||||
|
||||
static void close(OioSocketChannel channel, ChannelFuture future) {
|
||||
boolean connected = channel.isConnected();
|
||||
boolean bound = channel.isBound();
|
||||
try {
|
||||
channel.socket.close();
|
||||
if (channel.setClosed()) {
|
||||
future.setSuccess();
|
||||
if (connected) {
|
||||
// Notify the worker so it stops reading.
|
||||
Thread currentThread = Thread.currentThread();
|
||||
Thread workerThread = channel.workerThread;
|
||||
if (workerThread != null && currentThread != workerThread) {
|
||||
workerThread.interrupt();
|
||||
}
|
||||
fireChannelDisconnected(channel);
|
||||
}
|
||||
if (bound) {
|
||||
fireChannelUnbound(channel);
|
||||
}
|
||||
fireChannelClosed(channel);
|
||||
} else {
|
||||
future.setSuccess();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
future.setFailure(t);
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user