More interfaces for the XNIO transport

This commit is contained in:
Trustin Lee 2009-02-25 15:19:54 +00:00
parent d2274f75da
commit 23b13eeafe
8 changed files with 163 additions and 129 deletions

View File

@ -43,7 +43,7 @@ import org.jboss.netty.util.ConversionUtil;
* @version $Rev$, $Date$
*
*/
class DefaultXnioChannelConfig implements XnioChannelConfig {
final class DefaultXnioChannelConfig implements XnioChannelConfig {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultXnioChannelConfig.class);

View File

@ -0,0 +1,147 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.xnio.IoFuture;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.channels.BoundChannel;
import org.jboss.xnio.channels.BoundServer;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
@SuppressWarnings("unchecked")
final class DefaultXnioServerChannel extends BaseXnioChannel implements XnioServerChannel {
private static final Object bindLock = new Object();
final BoundServer xnioServer;
DefaultXnioServerChannel(
XnioServerChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, BoundServer xnioServer) {
super(null, factory, pipeline, sink, new DefaultXnioChannelConfig());
this.xnioServer = xnioServer;
fireChannelOpen(this);
}
@Override
public XnioServerChannelFactory getFactory() {
return (XnioServerChannelFactory) super.getFactory();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return getUnsupportedOperationFuture();
}
@Override
public ChannelFuture disconnect() {
return getUnsupportedOperationFuture();
}
@Override
public int getInterestOps() {
return OP_NONE;
}
@Override
public ChannelFuture setInterestOps(int interestOps) {
return getUnsupportedOperationFuture();
}
@Override
protected void setInterestOpsNow(int interestOps) {
// Ignore.
}
void bindNow(ChannelFuture future, SocketAddress localAddress) {
try {
synchronized (bindLock) {
IoFuture<BoundChannel> bindFuture = xnioServer.bind(localAddress);
for (;;) {
IoFuture.Status bindStatus = bindFuture.await();
switch (bindStatus) {
case WAITING:
// Keep waiting for the result.
continue;
case CANCELLED:
throw new Error("should not reach here");
case DONE:
break;
case FAILED:
throw bindFuture.getException();
default:
throw new Error("should not reach here: " + bindStatus);
}
// Break the loop if done.
break;
}
BoundChannel xnioChannel = bindFuture.get();
this.xnioChannel = xnioChannel;
XnioChannelRegistry.registerServerChannel(this);
}
future.setSuccess();
fireChannelBound(this, getLocalAddress());
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(this, t);
}
}
@Override
void closeNow(ChannelFuture future) {
SocketAddress localAddress = getLocalAddress();
boolean bound = localAddress != null;
try {
future.setSuccess();
if (setClosed()) {
synchronized (bindLock) {
IoUtils.safeClose(xnioChannel);
XnioChannelRegistry.unregisterServerChannel(localAddress);
XnioChannelRegistry.unregisterChannelMapping(this);
}
if (bound) {
fireChannelUnbound(this);
}
fireChannelClosed(this);
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(this, t);
}
}
}

View File

@ -18,7 +18,7 @@ final class XnioAcceptedChannelHandler extends AbstractXnioChannelHandler {
public void handleOpened(java.nio.channels.Channel channel) {
// Get the parent channel
XnioServerChannel parent = null;
DefaultXnioServerChannel parent = null;
if (channel instanceof BoundChannel) {
SocketAddress localAddress = (SocketAddress) ((BoundChannel) channel).getLocalAddress();
parent = XnioChannelRegistry.getServerChannel(localAddress);

View File

@ -37,8 +37,8 @@ import org.jboss.netty.util.ConcurrentIdentityHashMap;
*/
final class XnioChannelRegistry {
private static final ConcurrentMap<SocketAddress, XnioServerChannel> serverChannels =
new ConcurrentHashMap<SocketAddress, XnioServerChannel>();
private static final ConcurrentMap<SocketAddress, DefaultXnioServerChannel> serverChannels =
new ConcurrentHashMap<SocketAddress, DefaultXnioServerChannel>();
private static final ConcurrentMap<java.nio.channels.Channel, BaseXnioChannel> mapping =
new ConcurrentIdentityHashMap<java.nio.channels.Channel, BaseXnioChannel>();
@ -65,7 +65,7 @@ final class XnioChannelRegistry {
}
}
static void registerServerChannel(XnioServerChannel channel) {
static void registerServerChannel(DefaultXnioServerChannel channel) {
SocketAddress localAddress = channel.getLocalAddress();
if (localAddress == null) {
throw new IllegalStateException("cannot register an unbound channel");
@ -82,9 +82,9 @@ final class XnioChannelRegistry {
serverChannels.remove(localAddress);
}
static XnioServerChannel getServerChannel(SocketAddress localAddress) {
static DefaultXnioServerChannel getServerChannel(SocketAddress localAddress) {
// XXX: More IPv4 <-> IPv6 address conversion
XnioServerChannel answer = serverChannels.get(localAddress);
DefaultXnioServerChannel answer = serverChannels.get(localAddress);
if (answer == null && localAddress instanceof InetSocketAddress) {
InetSocketAddress a = (InetSocketAddress) localAddress;
answer = serverChannels.get(new InetSocketAddress(ANY_IPV6, a.getPort()));

View File

@ -7,7 +7,7 @@ import static org.jboss.netty.channel.Channels.*;
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public class XnioClientChannelHandler extends AbstractXnioChannelHandler {
final class XnioClientChannelHandler extends AbstractXnioChannelHandler {
public void handleOpened(java.nio.channels.Channel channel) {
XnioChannel c = XnioChannelRegistry.getChannel(channel);

View File

@ -22,127 +22,13 @@
*/
package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ServerChannel;
import org.jboss.xnio.IoFuture;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.channels.BoundChannel;
import org.jboss.xnio.channels.BoundServer;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
@SuppressWarnings("unchecked")
final class XnioServerChannel extends BaseXnioChannel implements ServerChannel {
private static final Object bindLock = new Object();
final BoundServer xnioServer;
XnioServerChannel(
XnioServerChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, BoundServer xnioServer) {
super(null, factory, pipeline, sink, new DefaultXnioChannelConfig());
this.xnioServer = xnioServer;
fireChannelOpen(this);
}
@Override
public XnioServerChannelFactory getFactory() {
return (XnioServerChannelFactory) super.getFactory();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return getUnsupportedOperationFuture();
}
@Override
public ChannelFuture disconnect() {
return getUnsupportedOperationFuture();
}
@Override
public int getInterestOps() {
return OP_NONE;
}
@Override
public ChannelFuture setInterestOps(int interestOps) {
return getUnsupportedOperationFuture();
}
@Override
protected void setInterestOpsNow(int interestOps) {
// Ignore.
}
void bindNow(ChannelFuture future, SocketAddress localAddress) {
try {
synchronized (bindLock) {
IoFuture<BoundChannel> bindFuture = xnioServer.bind(localAddress);
for (;;) {
IoFuture.Status bindStatus = bindFuture.await();
switch (bindStatus) {
case WAITING:
// Keep waiting for the result.
continue;
case CANCELLED:
throw new Error("should not reach here");
case DONE:
break;
case FAILED:
throw bindFuture.getException();
default:
throw new Error("should not reach here: " + bindStatus);
}
// Break the loop if done.
break;
}
BoundChannel xnioChannel = bindFuture.get();
this.xnioChannel = xnioChannel;
XnioChannelRegistry.registerServerChannel(this);
}
future.setSuccess();
fireChannelBound(this, getLocalAddress());
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(this, t);
}
}
@Override
void closeNow(ChannelFuture future) {
SocketAddress localAddress = getLocalAddress();
boolean bound = localAddress != null;
try {
future.setSuccess();
if (setClosed()) {
synchronized (bindLock) {
IoUtils.safeClose(xnioChannel);
XnioChannelRegistry.unregisterServerChannel(localAddress);
XnioChannelRegistry.unregisterChannelMapping(this);
}
if (bound) {
fireChannelUnbound(this);
}
fireChannelClosed(this);
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(this, t);
}
}
public interface XnioServerChannel extends XnioChannel, ServerChannel {
XnioServerChannelFactory getFactory();
}

View File

@ -23,7 +23,6 @@
package org.jboss.netty.channel.xnio;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ServerChannel;
import org.jboss.netty.channel.ServerChannelFactory;
import org.jboss.xnio.channels.BoundServer;
@ -46,8 +45,8 @@ public class XnioServerChannelFactory implements ServerChannelFactory {
sink = new XnioServerChannelSink();
}
public ServerChannel newChannel(ChannelPipeline pipeline) {
return new XnioServerChannel(this, pipeline, sink, xnioServer);
public XnioServerChannel newChannel(ChannelPipeline pipeline) {
return new DefaultXnioServerChannel(this, pipeline, sink, xnioServer);
}
public void releaseExternalResources() {

View File

@ -48,10 +48,12 @@ final class XnioServerChannelSink extends AbstractChannelSink {
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof XnioServerChannel) {
if (channel instanceof DefaultXnioServerChannel) {
handleServerSocket(e);
} else if (channel instanceof XnioChannel) {
acceptedChannelSink.eventSunk(pipeline, e);
} else {
throw new Error("should not reach here");
}
}
@ -61,7 +63,7 @@ final class XnioServerChannelSink extends AbstractChannelSink {
}
ChannelStateEvent event = (ChannelStateEvent) e;
XnioServerChannel channel = (XnioServerChannel) event.getChannel();
DefaultXnioServerChannel channel = (DefaultXnioServerChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();