Removed outdated XNIO transport - it needs to be rewritten in the next major feature release.

This commit is contained in:
Trustin Lee 2010-03-24 04:00:49 +00:00
parent 0d3389ee52
commit 807291a2db
19 changed files with 0 additions and 1798 deletions

16
pom.xml
View File

@ -63,22 +63,6 @@
</scm>
<dependencies>
<!-- JBoss XNIO - completely optional -->
<dependency>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-api</artifactId>
<version>1.2.1.GA</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-nio</artifactId>
<version>1.2.1.GA</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<!-- Google Protocol Buffers - completely optional -->
<dependency>
<groupId>com.google.protobuf</groupId>

View File

@ -1,246 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.util.Queue;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.xnio.IoHandler;
import org.jboss.xnio.channels.MultipointReadResult;
import org.jboss.xnio.channels.MultipointReadableMessageChannel;
import org.jboss.xnio.channels.MultipointWritableMessageChannel;
import org.jboss.xnio.channels.ReadableMessageChannel;
import org.jboss.xnio.channels.SuspendableReadChannel;
import org.jboss.xnio.channels.SuspendableWriteChannel;
import org.jboss.xnio.channels.WritableMessageChannel;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
@SuppressWarnings("unchecked")
abstract class AbstractXnioChannelHandler implements IoHandler<java.nio.channels.Channel> {
protected AbstractXnioChannelHandler() {
super();
}
public void handleReadable(java.nio.channels.Channel channel) {
BaseXnioChannel c = XnioChannelRegistry.getChannel(channel);
boolean closed = false;
ReceiveBufferSizePredictor predictor = c.getConfig().getReceiveBufferSizePredictor();
ChannelBufferFactory bufferFactory = c.getConfig().getBufferFactory();
ChannelBuffer buf = bufferFactory.getBuffer(predictor.nextReceiveBufferSize());
SocketAddress remoteAddress = null;
Throwable exception = null;
if (channel instanceof ScatteringByteChannel) {
try {
while (buf.writable()) {
int readBytes = buf.writeBytes((ScatteringByteChannel) channel, buf.writableBytes());
if (readBytes == 0) {
break;
} else if (readBytes < 0) {
closed = true;
break;
}
}
} catch (IOException e) {
exception = e;
closed = true;
}
} else if (channel instanceof MultipointReadableMessageChannel) {
ByteBuffer nioBuf = buf.toByteBuffer();
try {
MultipointReadResult res = ((MultipointReadableMessageChannel) channel).receive(nioBuf);
if (res != null) {
buf = ChannelBuffers.wrappedBuffer(nioBuf);
remoteAddress = (SocketAddress) res.getSourceAddress();
}
} catch (IOException e) {
exception = e;
closed = true;
}
} else if (channel instanceof ReadableMessageChannel) {
ByteBuffer nioBuf = buf.toByteBuffer();
try {
int readBytes = ((ReadableMessageChannel) channel).receive(nioBuf);
if (readBytes > 0) {
buf = ChannelBuffers.wrappedBuffer(nioBuf);
} else if (readBytes < 0) {
closed = true;
}
} catch (IOException e) {
exception = e;
closed = true;
}
}
if (buf.readable()) {
// Update the predictor.
predictor.previousReceiveBufferSize(buf.readableBytes());
// Fire the event.
fireMessageReceived(c, buf, remoteAddress);
}
if (exception != null) {
fireExceptionCaught(c, exception);
}
if (closed) {
close(c);
} else {
resumeRead(channel);
}
}
public void handleWritable(java.nio.channels.Channel channel) {
BaseXnioChannel c = XnioChannelRegistry.getChannel(channel);
int writtenBytes = 0;
boolean open = true;
boolean addOpWrite = false;
MessageEvent evt;
ChannelBuffer buf;
int bufIdx;
Queue<MessageEvent> writeBuffer = c.writeBuffer;
synchronized (c.writeLock) {
evt = c.currentWriteEvent;
for (;;) {
if (evt == null) {
evt = writeBuffer.poll();
if (evt == null) {
c.currentWriteEvent = null;
break;
}
buf = (ChannelBuffer) evt.getMessage();
bufIdx = buf.readerIndex();
} else {
buf = (ChannelBuffer) evt.getMessage();
bufIdx = c.currentWriteIndex;
}
try {
final int writeSpinCount = c.getConfig().getWriteSpinCount();
boolean sent = false;
for (int i = writeSpinCount; i > 0; i --) {
if (channel instanceof GatheringByteChannel) {
int localWrittenBytes = buf.getBytes(
bufIdx,
(GatheringByteChannel) channel,
buf.writerIndex() - bufIdx);
if (localWrittenBytes != 0) {
bufIdx += localWrittenBytes;
writtenBytes += localWrittenBytes;
break;
}
} else if (channel instanceof MultipointWritableMessageChannel) {
ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx);
int nioBufSize = nioBuf.remaining();
SocketAddress remoteAddress = evt.getRemoteAddress();
if (remoteAddress == null) {
remoteAddress = c.getRemoteAddress();
}
sent = ((MultipointWritableMessageChannel) channel).send(remoteAddress, nioBuf);
if (sent) {
bufIdx += nioBufSize;
writtenBytes += nioBufSize;
break;
}
} else if (channel instanceof WritableMessageChannel) {
ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx);
int nioBufSize = nioBuf.remaining();
sent = ((WritableMessageChannel) channel).send(nioBuf);
if (sent) {
bufIdx += nioBufSize;
writtenBytes += nioBufSize;
break;
}
} else {
throw new IllegalArgumentException("Unsupported channel type: " + channel.getClass().getName());
}
}
if (bufIdx == buf.writerIndex() || sent) {
// Successful write - proceed to the next message.
evt.getFuture().setSuccess();
evt = null;
} else {
// Not written fully - perhaps the kernel buffer is full.
c.currentWriteEvent = evt;
c.currentWriteIndex = bufIdx;
addOpWrite = true;
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
evt.getFuture().setFailure(t);
evt = null;
fireExceptionCaught(c, t);
if (t instanceof IOException) {
open = false;
c.closeNow(succeededFuture(c));
}
}
}
}
if (writtenBytes > 0) {
fireWriteComplete(c, writtenBytes);
}
if (open) {
if (addOpWrite && channel instanceof SuspendableWriteChannel) {
((SuspendableWriteChannel) channel).resumeWrites();
}
}
}
public void handleClosed(java.nio.channels.Channel channel) {
close(XnioChannelRegistry.getChannel(channel));
}
protected void resumeRead(java.nio.channels.Channel channel) {
if (channel instanceof SuspendableReadChannel) {
((SuspendableReadChannel) channel).resumeReads();
}
}
protected void close(BaseXnioChannel c) {
if (c != null) {
c.closeNow(c.getCloseFuture());
}
}
}

View File

@ -1,265 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import java.nio.channels.GatheringByteChannel;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.ThreadLocalBoolean;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.channels.BoundChannel;
import org.jboss.xnio.channels.ConnectedChannel;
import org.jboss.xnio.channels.MultipointWritableMessageChannel;
import org.jboss.xnio.channels.WritableMessageChannel;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
@SuppressWarnings("unchecked")
class BaseXnioChannel extends AbstractChannel implements XnioChannel {
private final XnioChannelConfig config;
volatile java.nio.channels.Channel xnioChannel;
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
final Object writeLock = new Object();
final Queue<MessageEvent> writeBuffer = new WriteBuffer();
final AtomicInteger writeBufferSize = new AtomicInteger();
final AtomicInteger highWaterMarkCounter = new AtomicInteger();
MessageEvent currentWriteEvent;
int currentWriteIndex;
BaseXnioChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink,
XnioChannelConfig config) {
super(parent, factory, pipeline, sink);
this.config = config;
}
public XnioChannelConfig getConfig() {
return config;
}
public SocketAddress getLocalAddress() {
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (!isOpen() || !(xnioChannel instanceof BoundChannel)) {
return null;
}
this.localAddress = localAddress =
(SocketAddress) ((BoundChannel) xnioChannel).getLocalAddress();
}
return localAddress;
}
public SocketAddress getRemoteAddress() {
SocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (!isOpen() || !(xnioChannel instanceof ConnectedChannel)) {
return null;
}
this.remoteAddress = remoteAddress =
(SocketAddress) ((ConnectedChannel) xnioChannel).getPeerAddress();
}
return remoteAddress;
}
public boolean isBound() {
return getLocalAddress() != null;
}
public boolean isConnected() {
return getRemoteAddress() != null;
}
@Override
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() > 0) {
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
} else {
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
}
} else {
interestOps &= ~Channel.OP_WRITE;
}
return interestOps;
}
int getRawInterestOps() {
return super.getInterestOps();
}
void setRawInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override
public ChannelFuture write(Object message) {
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (xnioChannel instanceof MultipointWritableMessageChannel) {
SocketAddress remoteAddress = getRemoteAddress();
if (remoteAddress != null) {
return write(message, remoteAddress);
} else {
return getUnsupportedOperationFuture();
}
}
if (xnioChannel instanceof GatheringByteChannel ||
xnioChannel instanceof WritableMessageChannel) {
return super.write(message);
} else {
return getUnsupportedOperationFuture();
}
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null) {
return write(message);
}
java.nio.channels.Channel xnioChannel = this.xnioChannel;
if (xnioChannel instanceof MultipointWritableMessageChannel) {
return super.write(message);
} else {
return getUnsupportedOperationFuture();
}
}
void closeNow(ChannelFuture future) {
SocketAddress localAddress = getLocalAddress();
SocketAddress remoteAddress = getRemoteAddress();
if (!setClosed()) {
future.setSuccess();
return;
}
try {
IoUtils.safeClose(xnioChannel);
xnioChannel = null;
XnioChannelRegistry.unregisterChannelMapping(this);
future.setSuccess();
if (remoteAddress != null) {
fireChannelDisconnected(this);
}
if (localAddress != null) {
fireChannelUnbound(this);
}
fireChannelClosed(this);
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(this, t);
}
}
private final class WriteBuffer extends LinkedTransferQueue<MessageEvent> {
private static final long serialVersionUID = 9223361436545857472L;
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
WriteBuffer() {
super();
}
@Override
public boolean offer(MessageEvent e) {
boolean success = super.offer(e);
assert success;
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize < highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(BaseXnioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
return true;
}
@Override
public MessageEvent poll() {
MessageEvent e = super.poll();
if (e != null) {
int messageSize = ((ChannelBuffer) e.getMessage()).readableBytes();
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {
highWaterMarkCounter.decrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(BaseXnioChannel.this);
notifying.set(Boolean.FALSE);
}
}
}
}
return e;
}
}
}

View File

@ -1,216 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import java.util.Map;
import java.util.Map.Entry;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.HeapChannelBufferFactory;
import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.ConversionUtil;
/**
* The default {@link XnioChannelConfig} implementation.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*
*/
final class DefaultXnioChannelConfig implements XnioChannelConfig {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultXnioChannelConfig.class);
private static final ReceiveBufferSizePredictorFactory DEFAULT_PREDICTOR_FACTORY =
new AdaptiveReceiveBufferSizePredictorFactory();
private volatile ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance();
private volatile ChannelPipelineFactory pipelineFactory;
private volatile int writeBufferHighWaterMark = 64 * 1024;
private volatile int writeBufferLowWaterMark = 32 * 1024;
private volatile ReceiveBufferSizePredictor predictor;
private volatile ReceiveBufferSizePredictorFactory predictorFactory = DEFAULT_PREDICTOR_FACTORY;
private volatile int writeSpinCount = 16;
DefaultXnioChannelConfig() {
super();
}
public void setOptions(Map<String, Object> options) {
for (Entry<String, Object> e: options.entrySet()) {
setOption(e.getKey(), e.getValue());
}
if (getWriteBufferHighWaterMark() < getWriteBufferLowWaterMark()) {
// Recover the integrity of the configuration with a sensible value.
setWriteBufferLowWaterMark0(getWriteBufferHighWaterMark() >>> 1);
// Notify the user about misconfiguration.
logger.warn(
"writeBufferLowWaterMark cannot be greater than " +
"writeBufferHighWaterMark; setting to the half of the " +
"writeBufferHighWaterMark.");
}
}
public boolean setOption(String key, Object value) {
if (key.equals("pipelineFactory")) {
setPipelineFactory((ChannelPipelineFactory) value);
} else if (key.equals("bufferFactory")) {
setBufferFactory((ChannelBufferFactory) value);
} else if (key.equals("writeBufferHighWaterMark")) {
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
} else if (key.equals("writeBufferLowWaterMark")) {
setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
} else if (key.equals("writeSpinCount")) {
setWriteSpinCount(ConversionUtil.toInt(value));
} else if (key.equals("receiveBufferSizePredictor")) {
setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
} else {
return false;
}
return true;
}
public int getWriteBufferHighWaterMark() {
return writeBufferHighWaterMark;
}
public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
if (writeBufferHighWaterMark < getWriteBufferLowWaterMark()) {
throw new IllegalArgumentException(
"writeBufferHighWaterMark cannot be less than " +
"writeBufferLowWaterMark (" + getWriteBufferLowWaterMark() + "): " +
writeBufferHighWaterMark);
}
setWriteBufferHighWaterMark0(writeBufferHighWaterMark);
}
private void setWriteBufferHighWaterMark0(int writeBufferHighWaterMark) {
if (writeBufferHighWaterMark < 0) {
throw new IllegalArgumentException(
"writeBufferHighWaterMark: " + writeBufferHighWaterMark);
}
this.writeBufferHighWaterMark = writeBufferHighWaterMark;
}
public int getWriteBufferLowWaterMark() {
return writeBufferLowWaterMark;
}
public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
if (writeBufferLowWaterMark > getWriteBufferHighWaterMark()) {
throw new IllegalArgumentException(
"writeBufferLowWaterMark cannot be greater than " +
"writeBufferHighWaterMark (" + getWriteBufferHighWaterMark() + "): " +
writeBufferLowWaterMark);
}
setWriteBufferLowWaterMark0(writeBufferLowWaterMark);
}
private void setWriteBufferLowWaterMark0(int writeBufferLowWaterMark) {
if (writeBufferLowWaterMark < 0) {
throw new IllegalArgumentException(
"writeBufferLowWaterMark: " + writeBufferLowWaterMark);
}
this.writeBufferLowWaterMark = writeBufferLowWaterMark;
}
public int getWriteSpinCount() {
return writeSpinCount;
}
public void setWriteSpinCount(int writeSpinCount) {
if (writeSpinCount <= 0) {
throw new IllegalArgumentException(
"writeSpinCount must be a positive integer.");
}
this.writeSpinCount = writeSpinCount;
}
public ReceiveBufferSizePredictor getReceiveBufferSizePredictor() {
ReceiveBufferSizePredictor predictor = this.predictor;
if (predictor == null) {
try {
this.predictor = predictor = getReceiveBufferSizePredictorFactory().getPredictor();
} catch (Exception e) {
throw new ChannelException(
"Failed to create a new " +
ReceiveBufferSizePredictor.class.getSimpleName() + '.',
e);
}
}
return predictor;
}
public void setReceiveBufferSizePredictor(
ReceiveBufferSizePredictor predictor) {
if (predictor == null) {
throw new NullPointerException("predictor");
}
this.predictor = predictor;
}
public ReceiveBufferSizePredictorFactory getReceiveBufferSizePredictorFactory() {
return predictorFactory;
}
public void setReceiveBufferSizePredictorFactory(ReceiveBufferSizePredictorFactory predictorFactory) {
if (predictorFactory == null) {
throw new NullPointerException("predictorFactory");
}
this.predictorFactory = predictorFactory;
}
public ChannelPipelineFactory getPipelineFactory() {
return pipelineFactory;
}
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
if (pipelineFactory == null) {
throw new NullPointerException("pipelineFactory");
}
this.pipelineFactory = pipelineFactory;
}
public ChannelBufferFactory getBufferFactory() {
return bufferFactory;
}
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
if (bufferFactory == null) {
throw new NullPointerException("bufferFactory");
}
this.bufferFactory = bufferFactory;
}
public int getConnectTimeoutMillis() {
return 0;
}
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
// Preconfigured by XNIO.
}
}

View File

@ -1,142 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.xnio.IoFuture;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.channels.BoundChannel;
import org.jboss.xnio.channels.BoundServer;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
@SuppressWarnings("unchecked")
final class DefaultXnioServerChannel extends BaseXnioChannel implements XnioServerChannel {
private static final Object bindLock = new Object();
final BoundServer xnioServer;
DefaultXnioServerChannel(
XnioServerChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, BoundServer xnioServer) {
super(null, factory, pipeline, sink, new DefaultXnioChannelConfig());
this.xnioServer = xnioServer;
fireChannelOpen(this);
}
@Override
public XnioServerChannelFactory getFactory() {
return (XnioServerChannelFactory) super.getFactory();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return getUnsupportedOperationFuture();
}
@Override
public ChannelFuture disconnect() {
return getUnsupportedOperationFuture();
}
@Override
public int getInterestOps() {
return OP_NONE;
}
@Override
public ChannelFuture setInterestOps(int interestOps) {
return getUnsupportedOperationFuture();
}
@Override
protected void setInterestOpsNow(int interestOps) {
// Ignore.
}
void bindNow(ChannelFuture future, SocketAddress localAddress) {
try {
synchronized (bindLock) {
IoFuture<BoundChannel> bindFuture = xnioServer.bind(localAddress);
for (;;) {
IoFuture.Status bindStatus = bindFuture.await();
switch (bindStatus) {
case WAITING:
// Keep waiting for the result.
continue;
case CANCELLED:
throw new Error("should not reach here");
case DONE:
break;
case FAILED:
throw bindFuture.getException();
default:
throw new Error("should not reach here: " + bindStatus);
}
// Break the loop if done.
break;
}
BoundChannel xnioChannel = bindFuture.get();
this.xnioChannel = xnioChannel;
XnioChannelRegistry.registerServerChannel(this);
}
future.setSuccess();
fireChannelBound(this, getLocalAddress());
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(this, t);
}
}
@Override
void closeNow(ChannelFuture future) {
SocketAddress localAddress = getLocalAddress();
boolean bound = localAddress != null;
try {
if (setClosed()) {
future.setSuccess();
synchronized (bindLock) {
IoUtils.safeClose(xnioChannel);
XnioChannelRegistry.unregisterServerChannel(localAddress);
XnioChannelRegistry.unregisterChannelMapping(this);
}
if (bound) {
fireChannelUnbound(this);
}
fireChannelClosed(this);
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(this, t);
}
}
}

View File

@ -1,55 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
final class XnioAcceptedChannel extends BaseXnioChannel {
XnioAcceptedChannel(
XnioServerChannel parent,
XnioServerChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
super(parent, factory, pipeline, sink, new DefaultXnioChannelConfig());
fireChannelOpen(this);
}
@Override
public XnioClientChannelFactory getFactory() {
return (XnioClientChannelFactory) super.getFactory();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return getUnsupportedOperationFuture();
}
@Override
public ChannelFuture bind(SocketAddress remoteAddress) {
return getUnsupportedOperationFuture();
}
}

View File

@ -1,79 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import org.jboss.xnio.IoUtils;
import org.jboss.xnio.channels.BoundChannel;
import org.jboss.xnio.channels.MultipointMessageChannel;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
@SuppressWarnings("unchecked")
final class XnioAcceptedChannelHandler extends AbstractXnioChannelHandler {
public void handleOpened(java.nio.channels.Channel channel) {
// Get the parent channel
DefaultXnioServerChannel parent = null;
if (channel instanceof BoundChannel) {
SocketAddress localAddress = (SocketAddress) ((BoundChannel) channel).getLocalAddress();
parent = XnioChannelRegistry.getServerChannel(localAddress);
if (parent == null) {
// An accepted channel with no parent
// probably a race condition or a port not bound by Netty.
IoUtils.safeClose(channel);
return;
}
} else {
// Should not reach here.
IoUtils.safeClose(channel);
return;
}
if (parent.xnioChannel instanceof MultipointMessageChannel) {
// Multipoint channel
XnioChannelRegistry.registerChannelMapping(parent);
} else {
// Accepted child channel
try {
BaseXnioChannel c = new XnioAcceptedChannel(
parent, parent.getFactory(),
parent.getConfig().getPipelineFactory().getPipeline(),
parent.getFactory().sink);
c.xnioChannel = channel;
fireChannelOpen(c);
if (c.isBound()) {
fireChannelBound(c, c.getLocalAddress());
if (c.isConnected()) {
fireChannelConnected(c, c.getRemoteAddress());
}
}
XnioChannelRegistry.registerChannelMapping(c);
} catch (Throwable t) {
t.printStackTrace();
}
}
// Start to read.
resumeRead(channel);
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import org.jboss.xnio.IoHandler;
import org.jboss.xnio.IoHandlerFactory;
import org.jboss.xnio.channels.BoundServer;
/**
* An XNIO {@link IoHandlerFactory} implementation that must be specified when
* you create a {@link BoundServer} to integrate XNIO into Netty.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
public class XnioAcceptedChannelHandlerFactory implements IoHandlerFactory<java.nio.channels.Channel> {
private final XnioAcceptedChannelHandler handler = new XnioAcceptedChannelHandler();
public IoHandler<java.nio.channels.Channel> createHandler() {
return handler;
}
}

View File

@ -1,30 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import org.jboss.netty.channel.Channel;
/**
* A {@link Channel} which uses <a href="http://www.jboss.org/xnio/">JBoss XNIO</a>
* as its I/O provider.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
public interface XnioChannel extends Channel {
XnioChannelConfig getConfig();
}

View File

@ -1,116 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor;
import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.socket.SocketChannelConfig;
/**
* A {@link ChannelConfig} for an {@link XnioChannel}.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link ChannelConfig} and
* {@link SocketChannelConfig}, {@link XnioChannelConfig} allows the
* following options in the option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@code "writeSpinCount"}</td><td>{@link #setWriteSpinCount(int)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSizePredictor"}</td><td>{@link #setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)}</td>
* </tr><tr>
* <td>{@code "receiveBufferSizePredictorFactory"}</td><td>{@link #setReceiveBufferSizePredictorFactory(ReceiveBufferSizePredictorFactory)}</td>
* </tr>
* </table>
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev$, $Date$
*/
public interface XnioChannelConfig extends ChannelConfig {
int getWriteBufferHighWaterMark();
void setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
int getWriteBufferLowWaterMark();
void setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
/**
* Sets the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*
* @throws IllegalArgumentException
* if the specified value is {@code 0} or less than {@code 0}
*/
void setWriteSpinCount(int writeSpinCount);
/**
* Returns the {@link ReceiveBufferSizePredictor} which predicts the
* number of readable bytes in the socket receive buffer. The default
* predictor is <tt>{@link AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536)</tt>.
*/
ReceiveBufferSizePredictor getReceiveBufferSizePredictor();
/**
* Sets the {@link ReceiveBufferSizePredictor} which predicts the
* number of readable bytes in the socket receive buffer. The default
* predictor is <tt>{@link AdaptiveReceiveBufferSizePredictor}(64, 1024, 65536)</tt>.
*/
void setReceiveBufferSizePredictor(ReceiveBufferSizePredictor predictor);
/**
* Returns the {@link ReceiveBufferSizePredictorFactory} which creates a new
* {@link ReceiveBufferSizePredictor} when a new channel is created and
* no {@link ReceiveBufferSizePredictor} was set. If no predictor was set
* for the channel, {@link #setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)}
* will be called with the new predictor. The default factory is
* <tt>{@link AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536)</tt>.
*/
ReceiveBufferSizePredictorFactory getReceiveBufferSizePredictorFactory();
/**
* Sets the {@link ReceiveBufferSizePredictor} which creates a new
* {@link ReceiveBufferSizePredictor} when a new channel is created and
* no {@link ReceiveBufferSizePredictor} was set. If no predictor was set
* for the channel, {@link #setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)}
* will be called with the new predictor. The default factory is
* <tt>{@link AdaptiveReceiveBufferSizePredictorFactory}(64, 1024, 65536)</tt>.
*/
void setReceiveBufferSizePredictorFactory(ReceiveBufferSizePredictorFactory predictorFactory);
}

View File

@ -1,111 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;
import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
final class XnioChannelRegistry {
private static final ConcurrentMap<SocketAddress, DefaultXnioServerChannel> serverChannels =
new ConcurrentHashMap<SocketAddress, DefaultXnioServerChannel>();
private static final ConcurrentMap<java.nio.channels.Channel, BaseXnioChannel> mapping =
new ConcurrentIdentityHashMap<java.nio.channels.Channel, BaseXnioChannel>();
private static final InetAddress ANY_IPV4;
private static final InetAddress ANY_IPV6;
static {
InetAddress any4 = null;
try {
any4 = InetAddress.getByAddress(new byte[] { 0, 0, 0, 0 });
} catch (Throwable t) {
// Ignore
} finally {
ANY_IPV4 = any4;
}
InetAddress any6 = null;
try {
any6 = InetAddress.getByAddress(new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 });
} catch (Throwable t) {
// Ignore
} finally {
ANY_IPV6 = any6;
}
}
static void registerServerChannel(DefaultXnioServerChannel channel) {
SocketAddress localAddress = channel.getLocalAddress();
if (localAddress == null) {
throw new IllegalStateException("cannot register an unbound channel");
}
if (serverChannels.putIfAbsent(localAddress, channel) != null) {
throw new IllegalStateException("duplicate local address: " + localAddress);
}
}
static void unregisterServerChannel(SocketAddress localAddress) {
if (localAddress == null) {
return;
}
serverChannels.remove(localAddress);
}
static DefaultXnioServerChannel getServerChannel(SocketAddress localAddress) {
// XXX: More IPv4 <-> IPv6 address conversion
DefaultXnioServerChannel answer = serverChannels.get(localAddress);
if (answer == null && localAddress instanceof InetSocketAddress) {
InetSocketAddress a = (InetSocketAddress) localAddress;
answer = serverChannels.get(new InetSocketAddress(ANY_IPV6, a.getPort()));
if (answer == null) {
answer = serverChannels.get(new InetSocketAddress(ANY_IPV4, a.getPort()));
}
}
return answer;
}
static void registerChannelMapping(BaseXnioChannel channel) {
if (mapping.putIfAbsent(channel.xnioChannel, channel) != null) {
throw new IllegalStateException("duplicate mapping: " + channel);
}
}
static void unregisterChannelMapping(BaseXnioChannel channel) {
java.nio.channels.Channel xnioChannel = channel.xnioChannel;
if (xnioChannel != null) {
mapping.remove(xnioChannel);
}
}
static BaseXnioChannel getChannel(java.nio.channels.Channel channel) {
return mapping.get(channel);
}
private XnioChannelRegistry() {
super();
}
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.xnio.Connector;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
@SuppressWarnings("unchecked")
final class XnioClientChannel extends BaseXnioChannel {
final Object connectLock = new Object();
final Connector xnioConnector;
volatile boolean connecting;
XnioClientChannel(
XnioClientChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, Connector xnioConnector) {
super(null, factory, pipeline, sink, new DefaultXnioChannelConfig());
this.xnioConnector = xnioConnector;
fireChannelOpen(this);
}
@Override
public XnioClientChannelFactory getFactory() {
return (XnioClientChannelFactory) super.getFactory();
}
}

View File

@ -1,52 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.xnio.Connector;
/**
* A client-side {@link ChannelFactory} which uses
* <a href="http://www.jboss.org/xnio/">JBoss XNIO</a> as its I/O provider.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
@SuppressWarnings("unchecked")
public class XnioClientChannelFactory implements ChannelFactory {
private final Connector xnioConnector;
private final XnioClientChannelSink sink;
public XnioClientChannelFactory(Connector xnioConnector) {
if (xnioConnector == null) {
throw new NullPointerException("xnioConnector");
}
this.xnioConnector = xnioConnector;
sink = new XnioClientChannelSink();
}
public Channel newChannel(ChannelPipeline pipeline) {
return new XnioClientChannel(this, pipeline, sink, xnioConnector);
}
public void releaseExternalResources() {
// Nothing to release.
}
}

View File

@ -1,40 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
final class XnioClientChannelHandler extends AbstractXnioChannelHandler {
public void handleOpened(java.nio.channels.Channel channel) {
XnioChannel c = XnioChannelRegistry.getChannel(channel);
fireChannelOpen(c);
if (c.isBound()) {
fireChannelBound(c, c.getLocalAddress());
if (c.isConnected()) {
fireChannelConnected(c, c.getRemoteAddress());
}
}
// Start to read.
resumeRead(channel);
}
}

View File

@ -1,158 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import static org.jboss.netty.channel.Channels.*;
import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.GatheringByteChannel;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.xnio.FutureConnection;
import org.jboss.xnio.IoFuture;
import org.jboss.xnio.IoFuture.Notifier;
import org.jboss.xnio.channels.MultipointWritableMessageChannel;
import org.jboss.xnio.channels.SuspendableReadChannel;
import org.jboss.xnio.channels.SuspendableWriteChannel;
import org.jboss.xnio.channels.WritableMessageChannel;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
final class XnioClientChannelSink extends AbstractChannelSink {
private static final XnioClientChannelHandler HANDLER = new XnioClientChannelHandler();
XnioClientChannelSink() {
super();
}
@SuppressWarnings("unchecked")
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
BaseXnioChannel channel = (BaseXnioChannel) e.getChannel();
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.closeNow(future);
}
break;
case BOUND:
case CONNECTED:
if (value != null) {
if (channel instanceof XnioClientChannel) {
final XnioClientChannel cc = (XnioClientChannel) channel;
synchronized (cc.connectLock) {
if (cc.connecting) {
Exception cause = new ConnectionPendingException();
future.setFailure(cause);
fireExceptionCaught(channel, cause);
} else {
cc.connecting = true;
java.nio.channels.Channel xnioChannel = cc.xnioChannel;
if (xnioChannel == null) {
FutureConnection fc =
cc.xnioConnector.connectTo(value, HANDLER);
fc.addNotifier(new FutureConnectionNotifier(cc), future);
} else {
Exception cause = new AlreadyConnectedException();
future.setFailure(cause);
fireExceptionCaught(cc, cause);
}
}
}
} else {
Exception cause = new UnsupportedOperationException();
future.setFailure(cause);
fireExceptionCaught(channel, cause);
}
} else {
channel.closeNow(future);
}
break;
case INTEREST_OPS:
int interestOps = ((Integer) value).intValue();
java.nio.channels.Channel xnioChannel = channel.xnioChannel;
if (xnioChannel instanceof SuspendableReadChannel) {
if ((interestOps & Channel.OP_READ) == 0) {
((SuspendableReadChannel) xnioChannel).suspendReads();
channel.setRawInterestOpsNow(Channel.OP_NONE);
} else {
((SuspendableReadChannel) xnioChannel).resumeReads();
channel.setRawInterestOpsNow(Channel.OP_READ);
}
}
e.getFuture().setSuccess();
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
java.nio.channels.Channel xnioChannel = channel.xnioChannel;
if (xnioChannel instanceof GatheringByteChannel ||
xnioChannel instanceof MultipointWritableMessageChannel ||
xnioChannel instanceof WritableMessageChannel) {
boolean offered = channel.writeBuffer.offer(event);
assert offered;
if (xnioChannel instanceof SuspendableWriteChannel) {
((SuspendableWriteChannel) xnioChannel).resumeWrites();
}
} else {
event.getFuture().setFailure(new IllegalStateException());
}
}
}
@SuppressWarnings("unchecked")
private static final class FutureConnectionNotifier implements Notifier {
private final XnioClientChannel cc;
FutureConnectionNotifier(XnioClientChannel cc) {
this.cc = cc;
}
public void notify(IoFuture future, Object attachment) {
ChannelFuture cf = (ChannelFuture) attachment;
try {
java.nio.channels.Channel xnioChannel = (java.nio.channels.Channel) future.get();
cc.xnioChannel = xnioChannel;
XnioChannelRegistry.registerChannelMapping(cc);
cf.setSuccess();
} catch (Throwable t) {
cf.setFailure(t);
fireExceptionCaught(cc, t);
} finally {
cc.connecting = false;
}
}
}
}

View File

@ -1,30 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import org.jboss.netty.channel.ServerChannel;
/**
* A {@link ServerChannel} which uses
* <a href="http://www.jboss.org/xnio/">JBoss XNIO</a> as its I/O provider.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
public interface XnioServerChannel extends XnioChannel, ServerChannel {
XnioServerChannelFactory getFactory();
}

View File

@ -1,55 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ServerChannelFactory;
import org.jboss.xnio.channels.BoundServer;
/**
* A {@link ServerChannelFactory} which uses
* <a href="http://www.jboss.org/xnio/">JBoss XNIO</a> as its I/O provider.
* <p>
* Please note that you must specify an {@link XnioAcceptedChannelHandlerFactory}
* when you create a {@link BoundServer} to integrate XNIO into Netty.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
@SuppressWarnings("unchecked")
public class XnioServerChannelFactory implements ServerChannelFactory {
private final BoundServer xnioServer;
final XnioServerChannelSink sink;
public XnioServerChannelFactory(BoundServer xnioServer) {
if (xnioServer == null) {
throw new NullPointerException("xnioServer");
}
this.xnioServer = xnioServer;
sink = new XnioServerChannelSink();
}
public XnioServerChannel newChannel(ChannelPipeline pipeline) {
return new DefaultXnioServerChannel(this, pipeline, sink, xnioServer);
}
public void releaseExternalResources() {
// Nothing to release.
}
}

View File

@ -1,79 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.xnio;
import java.net.SocketAddress;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
final class XnioServerChannelSink extends AbstractChannelSink {
private final XnioClientChannelSink acceptedChannelSink = new XnioClientChannelSink();
XnioServerChannelSink() {
super();
}
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof DefaultXnioServerChannel) {
handleServerSocket(e);
} else if (channel instanceof XnioChannel) {
acceptedChannelSink.eventSunk(pipeline, e);
} else {
throw new Error("should not reach here");
}
}
private void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
DefaultXnioServerChannel channel = (DefaultXnioServerChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.closeNow(future);
}
break;
case BOUND:
if (value != null) {
channel.bindNow(future, (SocketAddress) value);
} else {
channel.closeNow(future);
}
break;
}
}
}

View File

@ -1,23 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* An alternative channel API implementation which uses
* <a href="http://www.jboss.org/xnio/">JBoss XNIO</a> as its I/O provider.
*
* @apiviz.exclude Channel$
*/
package org.jboss.netty.channel.xnio;