Package was renamed. See #396

This commit is contained in:
Norman Maurer 2012-06-16 22:31:43 +02:00
parent 70c4f59c45
commit 5d1e710adc
8 changed files with 0 additions and 1114 deletions

View File

@ -1,168 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio2;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannel;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public abstract class AbstractAsyncChannel extends AbstractChannel {
protected volatile AsynchronousChannel ch;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
protected ChannelFuture connectFuture;
protected ScheduledFuture<?> connectTimeoutFuture;
private ConnectException connectTimeoutException;
protected AbstractAsyncChannel(Channel parent, Integer id) {
super(parent, id);
}
@Override
public InetSocketAddress localAddress() {
if (ch == null) {
return null;
}
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
if (ch == null) {
return null;
}
return (InetSocketAddress) super.remoteAddress();
}
protected AsynchronousChannel javaChannel() {
return ch;
}
@Override
public boolean isOpen() {
return ch == null || ch.isOpen();
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected AsyncUnsafe newUnsafe() {
return new AsyncUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof AsyncChildEventLoop;
}
protected class AsyncUnsafe extends AbstractUnsafe {
@Override
public void connect(final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
if (!ensureOpen(future)) {
return;
}
try {
if (connectFuture != null) {
throw new IllegalStateException("connection attempt already made");
}
connectFuture = future;
doConnect(remoteAddress, localAddress, future);
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
if (connectTimeoutException == null) {
connectTimeoutException = new ConnectException("connection timed out");
}
ChannelFuture connectFuture = AbstractAsyncChannel.this.connectFuture;
if (connectFuture != null &&
connectFuture.setFailure(connectTimeoutException)) {
pipeline().fireExceptionCaught(connectTimeoutException);
close(voidFuture());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
} catch (Throwable t) {
future.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
connect(remoteAddress, localAddress, future);
}
});
}
}
protected final void connectFailed(Throwable t) {
connectFuture.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
}
protected final void connectSuccess() {
assert eventLoop().inEventLoop();
assert connectFuture != null;
try {
boolean wasActive = isActive();
connectFuture.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
connectFuture.setFailure(t);
pipeline().fireExceptionCaught(t);
closeIfClosed();
} finally {
connectTimeoutFuture.cancel(false);
connectFuture = null;
}
}
}
protected abstract void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture connectFuture);
}

View File

@ -1,51 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio2;
import io.netty.channel.SingleThreadEventLoop;
import java.util.concurrent.ThreadFactory;
final class AsyncChildEventLoop extends SingleThreadEventLoop {
AsyncChildEventLoop(ThreadFactory threadFactory) {
super(threadFactory);
}
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
if (isShutdown() && peekTask() == null) {
break;
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop) {
interruptThread();
}
}
}

View File

@ -1,41 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio2;
import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoop;
import java.util.concurrent.ThreadFactory;
public class AsyncEventLoop extends MultithreadEventLoop {
public AsyncEventLoop() {
this(0);
}
public AsyncEventLoop(int nThreads) {
this(nThreads, null);
}
public AsyncEventLoop(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new AsyncChildEventLoop(threadFactory);
}
}

View File

@ -1,140 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio2;
import io.netty.buffer.ChannelBufType;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ServerChannel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AsyncServerSocketChannel extends AbstractAsyncChannel implements ServerChannel {
private static final AcceptHandler ACCEPT_HANDLER = new AcceptHandler();
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AsyncServerSocketChannel.class);
private volatile AsyncServerSocketChannelConfig config;
public AsyncServerSocketChannel() {
super(null, null);
}
@Override
protected AsynchronousServerSocketChannel javaChannel() {
return (AsynchronousServerSocketChannel) super.javaChannel();
}
@Override
public boolean isActive() {
AsynchronousServerSocketChannel channel = javaChannel();
try {
if (channel != null && channel.getLocalAddress() != null) {
return true;
}
} catch (IOException e) {
return true;
}
return false;
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.MESSAGE;
}
@Override
protected SocketAddress localAddress0() {
try {
return javaChannel().getLocalAddress();
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress);
javaChannel().accept(this, ACCEPT_HANDLER);
}
@Override
protected void doClose() throws Exception {
javaChannel().close();
}
@Override
protected boolean isFlushPending() {
return false;
}
@Override
protected void doConnect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
future.setFailure(new UnsupportedOperationException());
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Runnable doRegister() throws Exception {
ch = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop()));
config = new AsyncServerSocketChannelConfig(javaChannel());
return null;
}
private static final class AcceptHandler
implements CompletionHandler<AsynchronousSocketChannel, AsyncServerSocketChannel> {
public void completed(AsynchronousSocketChannel ch, AsyncServerSocketChannel channel) {
// register again this handler to accept new connections
channel.javaChannel().accept(channel, this);
// create the socket add it to the buffer and fire the event
channel.pipeline().inboundMessageBuffer().add(new AsyncSocketChannel(channel, null, ch));
channel.pipeline().fireInboundBufferUpdated();
}
public void failed(Throwable t, AsyncServerSocketChannel channel) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
}
}
@Override
public AsyncServerSocketChannelConfig config() {
if (config == null) {
throw new IllegalStateException("Channel not registered yet");
}
return config;
}
}

View File

@ -1,138 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio2;
import static io.netty.channel.ChannelOption.*;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig;
import java.io.IOException;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.Map;
/**
* The Async {@link ServerSocketChannelConfig} implementation.
*/
public class AsyncServerSocketChannelConfig extends DefaultChannelConfig
implements ServerSocketChannelConfig {
private final AsynchronousServerSocketChannel channel;
private volatile int backlog;
/**
* Creates a new instance.
*/
public AsyncServerSocketChannelConfig(AsynchronousServerSocketChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
}
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == SO_BACKLOG) {
setBacklog((Integer) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public boolean isReuseAddress() {
try {
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setReuseAddress(boolean reuseAddress) {
try {
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public int getReceiveBufferSize() {
try {
return channel.getOption(StandardSocketOptions.SO_RCVBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
try {
channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
throw new UnsupportedOperationException();
}
@Override
public int getBacklog() {
return backlog;
}
@Override
public void setBacklog(int backlog) {
if (backlog < 0) {
throw new IllegalArgumentException("backlog: " + backlog);
}
this.backlog = backlog;
}
}

View File

@ -1,237 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio2;
import static io.netty.channel.ChannelOption.*;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.socket.SocketChannelConfig;
import java.io.IOException;
import java.net.StandardSocketOptions;
import java.nio.channels.NetworkChannel;
import java.util.Map;
/**
* The default {@link SocketChannelConfig} implementation.
*/
public class AsyncSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig {
private final NetworkChannel channel;
/**
* Creates a new instance.
*/
public AsyncSocketChannelConfig(NetworkChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(
super.getOptions(),
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS);
}
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_SNDBUF) {
return (T) Integer.valueOf(getSendBufferSize());
}
if (option == TCP_NODELAY) {
return (T) Boolean.valueOf(isTcpNoDelay());
}
if (option == SO_KEEPALIVE) {
return (T) Boolean.valueOf(isKeepAlive());
}
if (option == SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == SO_LINGER) {
return (T) Integer.valueOf(getSoLinger());
}
if (option == IP_TOS) {
return (T) Integer.valueOf(getTrafficClass());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == TCP_NODELAY) {
setTcpNoDelay((Boolean) value);
} else if (option == SO_KEEPALIVE) {
setKeepAlive((Boolean) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == SO_LINGER) {
setSoLinger((Integer) value);
} else if (option == IP_TOS) {
setTrafficClass((Integer) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public int getReceiveBufferSize() {
try {
return (int) channel.getOption(StandardSocketOptions.SO_RCVBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public int getSendBufferSize() {
try {
return channel.getOption(StandardSocketOptions.SO_SNDBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public int getSoLinger() {
try {
return channel.getOption(StandardSocketOptions.SO_LINGER);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public int getTrafficClass() {
try {
return channel.getOption(StandardSocketOptions.IP_TOS);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public boolean isKeepAlive() {
try {
return channel.getOption(StandardSocketOptions.SO_KEEPALIVE);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public boolean isReuseAddress() {
try {
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public boolean isTcpNoDelay() {
try {
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setKeepAlive(boolean keepAlive) {
try {
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setPerformancePreferences(
int connectionTime, int latency, int bandwidth) {
throw new UnsupportedOperationException();
}
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
try {
channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setReuseAddress(boolean reuseAddress) {
try {
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setSendBufferSize(int sendBufferSize) {
try {
channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setSoLinger(int soLinger) {
try {
channel.setOption(StandardSocketOptions.SO_LINGER, soLinger);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setTcpNoDelay(boolean tcpNoDelay) {
try {
channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay);
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public void setTrafficClass(int trafficClass) {
try {
channel.setOption(StandardSocketOptions.IP_TOS, trafficClass);
} catch (IOException e) {
throw new ChannelException(e);
}
}
}

View File

@ -1,318 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelStateHandler;
import io.netty.channel.ChannelStateHandlerAdapter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.atomic.AtomicBoolean;
public class AsyncSocketChannel extends AbstractAsyncChannel {
private static final CompletionHandler<Void, AsyncSocketChannel> CONNECT_HANDLER = new ConnectHandler();
private static final CompletionHandler<Integer, AsyncSocketChannel> READ_HANDLER = new ReadHandler();
private static final CompletionHandler<Integer, AsyncSocketChannel> WRITE_HANDLER = new WriteHandler();
private static final ChannelStateHandler READ_START_HANDLER = new ChannelStateHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
try {
super.channelActive(ctx);
// once the channel is active, the first read is scheduled
AsyncSocketChannel.read((AsyncSocketChannel)ctx.channel());
} finally {
ctx.pipeline().remove(this);
}
}
};
private final AtomicBoolean flushing = new AtomicBoolean(false);
private volatile AsyncSocketChannelConfig config;
public AsyncSocketChannel() {
this(null, null, null);
}
public AsyncSocketChannel(AsyncServerSocketChannel parent, Integer id, AsynchronousSocketChannel channel) {
super(parent, id);
this.ch = channel;
if (ch != null) {
config = new AsyncSocketChannelConfig(javaChannel());
pipeline().addLast(READ_START_HANDLER);
}
}
@Override
public boolean isActive() {
AsynchronousSocketChannel ch = javaChannel();
return ch.isOpen() && remoteAddress() != null;
}
@Override
protected AsynchronousSocketChannel javaChannel() {
return (AsynchronousSocketChannel) super.javaChannel();
}
@Override
public ChannelBufType bufferType() {
return ChannelBufType.BYTE;
}
@Override
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelFuture future) {
assert ch != null;
if (localAddress != null) {
try {
javaChannel().bind(localAddress);
} catch (IOException e) {
future.setFailure(e);
return;
}
}
javaChannel().connect(remoteAddress, this, CONNECT_HANDLER);
}
@Override
protected InetSocketAddress localAddress0() {
try {
return (InetSocketAddress) javaChannel().getLocalAddress();
} catch (IOException e) {
return null;
}
}
@Override
protected InetSocketAddress remoteAddress0() {
try {
return (InetSocketAddress) javaChannel().getRemoteAddress();
} catch (IOException e) {
return null;
}
}
@Override
protected Runnable doRegister() throws Exception {
if (ch == null) {
ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop()));
config = new AsyncSocketChannelConfig(javaChannel());
pipeline().addLast(READ_START_HANDLER);
}
return null;
}
/**
* Trigger a read from the {@link AsyncSocketChannel}
*
*/
private static void read(AsyncSocketChannel channel) {
ByteBuf byteBuf = channel.pipeline().inboundByteBuffer();
expandReadBuffer(byteBuf);
// Get a ByteBuffer view on the ByteBuf and clear it before try to read
ByteBuffer buffer = byteBuf.nioBuffer();
buffer.clear();
channel.javaChannel().read(buffer, channel, READ_HANDLER);
}
private static boolean expandReadBuffer(ByteBuf byteBuf) {
if (!byteBuf.writable()) {
// FIXME: Magic number
byteBuf.ensureWritableBytes(4096);
return true;
}
return false;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress);
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
javaChannel().close();
}
@Override
protected boolean isFlushPending() {
return false;
}
@Override
protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception {
// Only one pending write can be scheduled at one time. Otherwise
// a PendingWriteException will be thrown. So use CAS to not run
// into this
if (flushing.compareAndSet(false, true)) {
ByteBuffer buffer = (ByteBuffer)buf.nioBuffer();
javaChannel().write(buffer, this, WRITE_HANDLER);
}
return false;
}
private static final class WriteHandler implements CompletionHandler<Integer, AsyncSocketChannel> {
@Override
public void completed(Integer result, AsyncSocketChannel channel) {
ByteBuf buf = channel.pipeline().outboundByteBuffer();
if (result > 0) {
if (result < buf.readableBytes()) {
// Update the readerIndex with the amount of read bytes
buf.readerIndex(buf.readerIndex() + result);
} else {
// not enough space in the buffer anymore so discard everything that
// was read already
buf.discardReadBytes();
}
channel.notifyFlushFutures();
}
// Allow to have the next write pending
channel.flushing.set(false);
}
@Override
public void failed(Throwable cause, AsyncSocketChannel channel) {
ByteBuf buf = channel.pipeline().outboundByteBuffer();
if (!buf.readable()) {
buf.discardReadBytes();
}
channel.notifyFlushFutures(cause);
channel.pipeline().fireExceptionCaught(cause);
if (cause instanceof IOException) {
channel.close(channel.unsafe().voidFuture());
}
// Allow to have the next write pending
channel.flushing.set(false);
}
}
private static final class ReadHandler implements CompletionHandler<Integer, AsyncSocketChannel> {
@Override
public void completed(Integer result, AsyncSocketChannel channel) {
assert channel.eventLoop().inEventLoop();
final ChannelPipeline pipeline = channel.pipeline();
boolean closed = false;
boolean read = false;
try {
int localReadAmount = result.intValue();
if (localReadAmount > 0) {
//Set the writerIndex of the buffer correctly to the
// current writerIndex + read amount of bytes.
//
// This is needed as the ByteBuffer and the ByteBuf does not share
// each others index
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
byteBuf.writerIndex(byteBuf.writerIndex() + result);
read = true;
} else if (localReadAmount < 0) {
closed = true;
}
} catch (Throwable t) {
if (read) {
read = false;
pipeline.fireInboundBufferUpdated();
}
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
channel.close(channel.unsafe().voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && channel.isOpen()) {
channel.close(channel.unsafe().voidFuture());
} else {
// start the next read
AsyncSocketChannel.read(channel);
}
}
}
@Override
public void failed(Throwable t, AsyncSocketChannel channel) {
channel.pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
channel.close(channel.unsafe().voidFuture());
} else {
// start the next read
AsyncSocketChannel.read(channel);
}
}
}
private static final class ConnectHandler implements CompletionHandler<Void, AsyncSocketChannel> {
@Override
public void completed(Void result, AsyncSocketChannel channel) {
((AsyncUnsafe) channel.unsafe()).connectSuccess();
// start reading from channel
AsyncSocketChannel.read(channel);
}
@Override
public void failed(Throwable exc, AsyncSocketChannel channel) {
((AsyncUnsafe) channel.unsafe()).connectFailed(exc);
}
}
@Override
public AsyncSocketChannelConfig config() {
if (config == null) {
throw new IllegalStateException("Channel not open yet");
}
return config;
}
}

View File

@ -1,21 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* <a href="http://en.wikipedia.org/wiki/New_I/O">NIO2</a>-based socket channel
* API implementation - recommended for a large number of connections (&gt;= 1000).
*/
package io.netty.channel.socket.nio2;