Introduced more interfaces to the XNIO transport

This commit is contained in:
Trustin Lee 2009-02-25 15:11:26 +00:00
parent 13c68c7643
commit d2274f75da
11 changed files with 490 additions and 163 deletions

View File

@ -35,7 +35,7 @@ public abstract class AbstractXnioChannelHandler implements IoHandler<java.nio.c
} }
public void handleReadable(java.nio.channels.Channel channel) { public void handleReadable(java.nio.channels.Channel channel) {
XnioChannel c = XnioChannelRegistry.getChannel(channel); BaseXnioChannel c = XnioChannelRegistry.getChannel(channel);
boolean closed = false; boolean closed = false;
@ -100,7 +100,7 @@ public abstract class AbstractXnioChannelHandler implements IoHandler<java.nio.c
} }
public void handleWritable(java.nio.channels.Channel channel) { public void handleWritable(java.nio.channels.Channel channel) {
XnioChannel c = XnioChannelRegistry.getChannel(channel); BaseXnioChannel c = XnioChannelRegistry.getChannel(channel);
if (channel instanceof GatheringByteChannel) { if (channel instanceof GatheringByteChannel) {
boolean open = true; boolean open = true;
boolean addOpWrite = false; boolean addOpWrite = false;
@ -129,8 +129,7 @@ public abstract class AbstractXnioChannelHandler implements IoHandler<java.nio.c
} }
try { try {
// TODO: Use configuration final int writeSpinCount = c.getConfig().getWriteSpinCount();
final int writeSpinCount = 4;
for (int i = writeSpinCount; i > 0; i --) { for (int i = writeSpinCount; i > 0; i --) {
int localWrittenBytes = buf.getBytes( int localWrittenBytes = buf.getBytes(
bufIdx, bufIdx,
@ -193,7 +192,7 @@ public abstract class AbstractXnioChannelHandler implements IoHandler<java.nio.c
} }
} }
protected void close(XnioChannel c) { protected void close(BaseXnioChannel c) {
if (c != null) { if (c != null) {
c.closeNow(c.getCloseFuture()); c.closeNow(c.getCloseFuture());
} }

View File

@ -0,0 +1,163 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import java.nio.channels.GatheringByteChannel;
import java.util.Queue;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.LinkedTransferQueue;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.channels.BoundChannel;
import org.jboss.xnio.channels.ConnectedChannel;
import org.jboss.xnio.channels.MultipointWritableMessageChannel;
import org.jboss.xnio.channels.WritableMessageChannel;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev: 937 $, $Date: 2009-02-25 19:43:03 +0900 (Wed, 25 Feb 2009) $
*/
@SuppressWarnings("unchecked")
class BaseXnioChannel extends AbstractChannel implements XnioChannel {
private final XnioChannelConfig config;
volatile java.nio.channels.Channel xnioChannel;
final Object writeLock = new Object();
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
MessageEvent currentWriteEvent;
int currentWriteIndex;
// TODO implement high / low water mark
/**
* @param parent
* @param factory
* @param pipeline
* @param sink
*/
BaseXnioChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink,
XnioChannelConfig config) {
super(parent, factory, pipeline, sink);
this.config = config;
}
public XnioChannelConfig getConfig() {
return config;
}
public SocketAddress getLocalAddress() {
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (!isOpen() || !(xnioChannel instanceof BoundChannel)) {
return null;
}
return (SocketAddress) ((BoundChannel) xnioChannel).getLocalAddress();
}
public SocketAddress getRemoteAddress() {
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (!isOpen() || !(xnioChannel instanceof ConnectedChannel)) {
return null;
}
return (SocketAddress) ((ConnectedChannel) xnioChannel).getPeerAddress();
}
public boolean isBound() {
return getLocalAddress() != null;
}
public boolean isConnected() {
return getRemoteAddress() != null;
}
@Override
public ChannelFuture write(Object message) {
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (xnioChannel instanceof MultipointWritableMessageChannel) {
SocketAddress remoteAddress = getRemoteAddress();
if (remoteAddress != null) {
return write(message, remoteAddress);
} else {
return getUnsupportedOperationFuture();
}
}
if (xnioChannel instanceof GatheringByteChannel ||
xnioChannel instanceof WritableMessageChannel) {
return super.write(message);
} else {
return getUnsupportedOperationFuture();
}
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null) {
return write(message);
}
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (xnioChannel instanceof MultipointWritableMessageChannel) {
return super.write(message);
} else {
return getUnsupportedOperationFuture();
}
}
void closeNow(ChannelFuture future) {
SocketAddress localAddress = getLocalAddress();
SocketAddress remoteAddress = getRemoteAddress();
if (!setClosed()) {
future.setSuccess();
return;
}
IoUtils.safeClose(xnioChannel);
xnioChannel = null;
XnioChannelRegistry.unregisterChannelMapping(this);
if (remoteAddress != null) {
fireChannelDisconnected(this);
}
if (localAddress != null) {
fireChannelUnbound(this);
}
fireChannelClosed(this);
}
}

View File

@ -0,0 +1,206 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.xnio;
import java.util.Map;
import java.util.Map.Entry;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.HeapChannelBufferFactory;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.DefaultReceiveBufferSizePredictor;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ConversionUtil;
/**
* The default {@link XnioChannelConfig} implementation.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @version $Rev$, $Date$
*
*/
class DefaultXnioChannelConfig implements XnioChannelConfig {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultXnioChannelConfig.class);
private volatile ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance();
private volatile ChannelPipelineFactory pipelineFactory;
private volatile int writeBufferHighWaterMark = 64 * 1024;
private volatile int writeBufferLowWaterMark = 32 * 1024;
private volatile ReceiveBufferSizePredictor predictor =
new DefaultReceiveBufferSizePredictor();
private volatile int writeSpinCount = 16;
DefaultXnioChannelConfig() {
super();
}
public void setOptions(Map<String, Object> options) {
for (Entry<String, Object> e: options.entrySet()) {
setOption(e.getKey(), e.getValue());
}
if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) {
// Recover the integrity of the configuration with a sensible value.
setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1);
// Notify the user about misconfiguration.
logger.warn(
"writeBufferLowWaterMark cannot be greater than " +
"writeBufferHighWaterMark; setting to the half of the " +
"writeBufferHighWaterMark.");
}
}
public boolean setOption(String key, Object value) {
if (key.equals("pipelineFactory")) {
setPipelineFactory((ChannelPipelineFactory) value);
} else if (key.equals("bufferFactory")) {
setBufferFactory((ChannelBufferFactory) value);
} else if (key.equals("writeBufferHighWaterMark")) {
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
} else if (key.equals("writeBufferLowWaterMark")) {
setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
} else if (key.equals("writeSpinCount")) {
setWriteSpinCount(ConversionUtil.toInt(value));
} else if (key.equals("receiveBufferSizePredictor")) {
setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
} else {
return false;
}
return true;
}
public int getWriteBufferHighWaterMark() {
return writeBufferHighWaterMark;
}
public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
if (writeBufferHighWaterMark < getWriteBufferLowWaterMark()) {
throw new IllegalArgumentException(
"writeBufferHighWaterMark cannot be less than " +
"writeBufferLowWaterMark (" + getWriteBufferLowWaterMark() + "): " +
writeBufferHighWaterMark);
}
setWriteBufferHighWaterMark0(writeBufferHighWaterMark);
}
private void setWriteBufferHighWaterMark0(int writeBufferHighWaterMark) {
if (writeBufferHighWaterMark < 0) {
throw new IllegalArgumentException(
"writeBufferHighWaterMark: " + writeBufferHighWaterMark);
}
this.writeBufferHighWaterMark = writeBufferHighWaterMark;
}
public int getWriteBufferLowWaterMark() {
return writeBufferLowWaterMark;
}
public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
if (writeBufferLowWaterMark > getWriteBufferHighWaterMark()) {
throw new IllegalArgumentException(
"writeBufferLowWaterMark cannot be greater than " +
"writeBufferHighWaterMark (" + getWriteBufferHighWaterMark() + "): " +
writeBufferLowWaterMark);
}
setWriteBufferLowWaterMark0(writeBufferLowWaterMark);
}
private void setWriteBufferLowWaterMark0(int writeBufferLowWaterMark) {
if (writeBufferLowWaterMark < 0) {
throw new IllegalArgumentException(
"writeBufferLowWaterMark: " + writeBufferLowWaterMark);
}
this.writeBufferLowWaterMark = writeBufferLowWaterMark;
}
public int getWriteSpinCount() {
return writeSpinCount;
}
public void setWriteSpinCount(int writeSpinCount) {
if (writeSpinCount <= 0) {
throw new IllegalArgumentException(
"writeSpinCount must be a positive integer.");
}
this.writeSpinCount = writeSpinCount;
}
public ReceiveBufferSizePredictor getReceiveBufferSizePredictor() {
return predictor;
}
public void setReceiveBufferSizePredictor(
ReceiveBufferSizePredictor predictor) {
if (predictor == null) {
throw new NullPointerException("predictor");
}
this.predictor = predictor;
}
public ChannelPipelineFactory getPipelineFactory() {
return pipelineFactory;
}
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
if (pipelineFactory == null) {
throw new NullPointerException("pipelineFactory");
}
this.pipelineFactory = pipelineFactory;
}
public ChannelBufferFactory getBufferFactory() {
return bufferFactory;
}
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
if (bufferFactory == null) {
throw new NullPointerException("bufferFactory");
}
this.bufferFactory = bufferFactory;
}
public int getConnectTimeoutMillis() {
return 0;
}
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
// Preconfigured by XNIO.
}
@Deprecated
public int getWriteTimeoutMillis() {
return 0;
}
@Deprecated
public void setWriteTimeoutMillis(int writeTimeoutMillis) {
// Unused.
}
}

View File

@ -29,20 +29,19 @@ import java.net.SocketAddress;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.DefaultChannelConfig;
/** /**
* @author The Netty Project (netty-dev@lists.jboss.org) * @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com) * @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
final class XnioAcceptedChannel extends XnioChannel { final class XnioAcceptedChannel extends BaseXnioChannel {
XnioAcceptedChannel( XnioAcceptedChannel(
XnioServerChannel parent, XnioServerChannel parent,
XnioServerChannelFactory factory, XnioServerChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) { ChannelPipeline pipeline, ChannelSink sink) {
super(parent, factory, pipeline, sink, new DefaultChannelConfig()); super(parent, factory, pipeline, sink, new DefaultXnioChannelConfig());
fireChannelOpen(this); fireChannelOpen(this);
} }

View File

@ -41,7 +41,7 @@ final class XnioAcceptedChannelHandler extends AbstractXnioChannelHandler {
} else { } else {
// Accepted child channel // Accepted child channel
try { try {
XnioChannel c = new XnioAcceptedChannel( BaseXnioChannel c = new XnioAcceptedChannel(
parent, parent.getFactory(), parent, parent.getFactory(),
parent.getConfig().getPipelineFactory().getPipeline(), parent.getConfig().getPipelineFactory().getPipeline(),
parent.getFactory().sink); parent.getFactory().sink);

View File

@ -22,143 +22,13 @@
*/ */
package org.jboss.netty.channel.xnio; package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import java.nio.channels.GatheringByteChannel;
import java.util.Queue;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.LinkedTransferQueue;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.channels.BoundChannel;
import org.jboss.xnio.channels.ConnectedChannel;
import org.jboss.xnio.channels.MultipointWritableMessageChannel;
import org.jboss.xnio.channels.WritableMessageChannel;
/** /**
* @author The Netty Project (netty-dev@lists.jboss.org) * @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com) * @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
@SuppressWarnings("unchecked") public interface XnioChannel extends Channel {
class XnioChannel extends AbstractChannel { XnioChannelConfig getConfig();
private final ChannelConfig config;
volatile java.nio.channels.Channel xnioChannel;
final Object writeLock = new Object();
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
MessageEvent currentWriteEvent;
int currentWriteIndex;
// TODO implement high / low water mark
/**
* @param parent
* @param factory
* @param pipeline
* @param sink
*/
XnioChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink,
ChannelConfig config) {
super(parent, factory, pipeline, sink);
this.config = config;
}
public ChannelConfig getConfig() {
return config;
}
public SocketAddress getLocalAddress() {
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (!isOpen() || !(xnioChannel instanceof BoundChannel)) {
return null;
}
return (SocketAddress) ((BoundChannel) xnioChannel).getLocalAddress();
}
public SocketAddress getRemoteAddress() {
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (!isOpen() || !(xnioChannel instanceof ConnectedChannel)) {
return null;
}
return (SocketAddress) ((ConnectedChannel) xnioChannel).getPeerAddress();
}
public boolean isBound() {
return getLocalAddress() != null;
}
public boolean isConnected() {
return getRemoteAddress() != null;
}
@Override
public ChannelFuture write(Object message) {
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (xnioChannel instanceof MultipointWritableMessageChannel) {
SocketAddress remoteAddress = getRemoteAddress();
if (remoteAddress != null) {
return write(message, remoteAddress);
} else {
return getUnsupportedOperationFuture();
}
}
if (xnioChannel instanceof GatheringByteChannel ||
xnioChannel instanceof WritableMessageChannel) {
return super.write(message);
} else {
return getUnsupportedOperationFuture();
}
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null) {
return write(message);
}
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (xnioChannel instanceof MultipointWritableMessageChannel) {
return super.write(message);
} else {
return getUnsupportedOperationFuture();
}
}
void closeNow(ChannelFuture future) {
SocketAddress localAddress = getLocalAddress();
SocketAddress remoteAddress = getRemoteAddress();
if (!setClosed()) {
future.setSuccess();
return;
}
IoUtils.safeClose(xnioChannel);
xnioChannel = null;
XnioChannelRegistry.unregisterChannelMapping(this);
if (remoteAddress != null) {
fireChannelDisconnected(this);
}
if (localAddress != null) {
fireChannelUnbound(this);
}
fireChannelClosed(this);
}
} }

View File

@ -0,0 +1,101 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.xnio;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.DefaultReceiveBufferSizePredictor;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.channel.socket.SocketChannelConfig;
/**
* A {@link ChannelConfig} for an {@link XnioChannel}.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link ChannelConfig} and
* {@link SocketChannelConfig}, {@link XnioChannelConfig} allows the
* following options in the option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@code "writeSpinCount"}</td><td>{@link #setWriteSpinCount(int)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSizePredictor"}</td><td>{@link #setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)}</td>
* </tr>
* </table>
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @version $Rev$, $Date$
*
* @apiviz.has org.jboss.netty.channel.socket.nio.ReceiveBufferSizePredictor
*/
public interface XnioChannelConfig extends ChannelConfig {
int getWriteBufferHighWaterMark();
void setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
int getWriteBufferLowWaterMark();
void setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
/**
* Sets the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*
* @throws IllegalArgumentException
* if the specified value is {@code 0} or less than {@code 0}
*/
void setWriteSpinCount(int writeSpinCount);
/**
* Returns the {@link ReceiveBufferSizePredictor} which predicts the
* number of readable bytes in the socket receive buffer. The default
* predictor is {@link DefaultReceiveBufferSizePredictor}.
*/
ReceiveBufferSizePredictor getReceiveBufferSizePredictor();
/**
* Sets the {@link ReceiveBufferSizePredictor} which predicts the
* number of readable bytes in the socket receive buffer. The default
* predictor is {@link DefaultReceiveBufferSizePredictor}.
*/
void setReceiveBufferSizePredictor(ReceiveBufferSizePredictor predictor);
}

View File

@ -27,7 +27,6 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.util.ConcurrentHashMap; import org.jboss.netty.util.ConcurrentHashMap;
import org.jboss.netty.util.ConcurrentIdentityHashMap; import org.jboss.netty.util.ConcurrentIdentityHashMap;
@ -40,8 +39,8 @@ final class XnioChannelRegistry {
private static final ConcurrentMap<SocketAddress, XnioServerChannel> serverChannels = private static final ConcurrentMap<SocketAddress, XnioServerChannel> serverChannels =
new ConcurrentHashMap<SocketAddress, XnioServerChannel>(); new ConcurrentHashMap<SocketAddress, XnioServerChannel>();
private static final ConcurrentMap<java.nio.channels.Channel, XnioChannel> mapping = private static final ConcurrentMap<java.nio.channels.Channel, BaseXnioChannel> mapping =
new ConcurrentIdentityHashMap<java.nio.channels.Channel, XnioChannel>(); new ConcurrentIdentityHashMap<java.nio.channels.Channel, BaseXnioChannel>();
private static final InetAddress ANY_IPV4; private static final InetAddress ANY_IPV4;
private static final InetAddress ANY_IPV6; private static final InetAddress ANY_IPV6;
@ -96,28 +95,20 @@ final class XnioChannelRegistry {
return answer; return answer;
} }
static void registerChannelMapping(XnioChannel channel) { static void registerChannelMapping(BaseXnioChannel channel) {
if (mapping.putIfAbsent(channel.xnioChannel, channel) != null) { if (mapping.putIfAbsent(channel.xnioChannel, channel) != null) {
throw new IllegalStateException("duplicate mapping: " + channel); throw new IllegalStateException("duplicate mapping: " + channel);
} }
} }
static void unregisterChannelMapping(Channel channel) { static void unregisterChannelMapping(BaseXnioChannel channel) {
java.nio.channels.Channel xnioChannel; java.nio.channels.Channel xnioChannel = channel.xnioChannel;
if (channel instanceof XnioChannel) {
xnioChannel = ((XnioChannel) channel).xnioChannel;
} else if (channel instanceof XnioServerChannel) {
xnioChannel = ((XnioServerChannel) channel).xnioChannel;
} else {
throw new Error("should not reach here");
}
if (xnioChannel != null) { if (xnioChannel != null) {
mapping.remove(xnioChannel); mapping.remove(xnioChannel);
} }
} }
static XnioChannel getChannel(java.nio.channels.Channel channel) { static BaseXnioChannel getChannel(java.nio.channels.Channel channel) {
return mapping.get(channel); return mapping.get(channel);
} }

View File

@ -26,7 +26,6 @@ import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.DefaultServerChannelConfig;
import org.jboss.xnio.Connector; import org.jboss.xnio.Connector;
/** /**
@ -35,7 +34,7 @@ import org.jboss.xnio.Connector;
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final class XnioClientChannel extends XnioChannel { final class XnioClientChannel extends BaseXnioChannel {
final Object connectLock = new Object(); final Object connectLock = new Object();
final Connector xnioConnector; final Connector xnioConnector;
@ -44,7 +43,7 @@ final class XnioClientChannel extends XnioChannel {
XnioClientChannel( XnioClientChannel(
XnioClientChannelFactory factory, XnioClientChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, Connector xnioConnector) { ChannelPipeline pipeline, ChannelSink sink, Connector xnioConnector) {
super(null, factory, pipeline, sink, new DefaultServerChannelConfig()); super(null, factory, pipeline, sink, new DefaultXnioChannelConfig());
this.xnioConnector = xnioConnector; this.xnioConnector = xnioConnector;
fireChannelOpen(this); fireChannelOpen(this);
} }

View File

@ -60,7 +60,7 @@ final class XnioClientChannelSink extends AbstractChannelSink {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void eventSunk( public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception { ChannelPipeline pipeline, ChannelEvent e) throws Exception {
XnioChannel channel = (XnioChannel) e.getChannel(); BaseXnioChannel channel = (BaseXnioChannel) e.getChannel();
if (e instanceof ChannelStateEvent) { if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e; ChannelStateEvent event = (ChannelStateEvent) e;
ChannelFuture future = event.getFuture(); ChannelFuture future = event.getFuture();

View File

@ -29,7 +29,6 @@ import java.net.SocketAddress;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.DefaultServerChannelConfig;
import org.jboss.netty.channel.ServerChannel; import org.jboss.netty.channel.ServerChannel;
import org.jboss.xnio.IoFuture; import org.jboss.xnio.IoFuture;
import org.jboss.xnio.IoUtils; import org.jboss.xnio.IoUtils;
@ -42,7 +41,7 @@ import org.jboss.xnio.channels.BoundServer;
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final class XnioServerChannel extends XnioChannel implements ServerChannel { final class XnioServerChannel extends BaseXnioChannel implements ServerChannel {
private static final Object bindLock = new Object(); private static final Object bindLock = new Object();
@ -51,7 +50,7 @@ final class XnioServerChannel extends XnioChannel implements ServerChannel {
XnioServerChannel( XnioServerChannel(
XnioServerChannelFactory factory, XnioServerChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, BoundServer xnioServer) { ChannelPipeline pipeline, ChannelSink sink, BoundServer xnioServer) {
super(null, factory, pipeline, sink, new DefaultServerChannelConfig()); super(null, factory, pipeline, sink, new DefaultXnioChannelConfig());
this.xnioServer = xnioServer; this.xnioServer = xnioServer;
fireChannelOpen(this); fireChannelOpen(this);
} }