From e1cbcd645686c8c0196ce94b3fdc1b0e3f781a46 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sun, 1 Jul 2012 21:50:17 +0200 Subject: [PATCH] Remove left-overs. See #396 --- .../socket/aio/AbstractAsyncChannel.java | 168 --------- .../socket/aio/AsyncChildEventLoop.java | 51 --- .../channel/socket/aio/AsyncEventLoop.java | 41 --- .../socket/aio/AsyncServerSocketChannel.java | 140 -------- .../aio/AsyncServerSocketChannelConfig.java | 138 -------- .../socket/aio/AsyncSocketChannel.java | 318 ------------------ .../socket/aio/AsyncSocketChannelConfig.java | 237 ------------- 7 files changed, 1093 deletions(-) delete mode 100755 transport/src/main/java/io/netty/channel/socket/aio/AbstractAsyncChannel.java delete mode 100755 transport/src/main/java/io/netty/channel/socket/aio/AsyncChildEventLoop.java delete mode 100755 transport/src/main/java/io/netty/channel/socket/aio/AsyncEventLoop.java delete mode 100755 transport/src/main/java/io/netty/channel/socket/aio/AsyncServerSocketChannel.java delete mode 100755 transport/src/main/java/io/netty/channel/socket/aio/AsyncServerSocketChannelConfig.java delete mode 100755 transport/src/main/java/io/netty/channel/socket/aio/AsyncSocketChannel.java delete mode 100755 transport/src/main/java/io/netty/channel/socket/aio/AsyncSocketChannelConfig.java diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAsyncChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAsyncChannel.java deleted file mode 100755 index c32e1cf9bc..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAsyncChannel.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.aio; - -import io.netty.channel.AbstractChannel; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.EventLoop; - -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.AsynchronousChannel; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -public abstract class AbstractAsyncChannel extends AbstractChannel { - - protected volatile AsynchronousChannel ch; - - /** - * The future of the current connection attempt. If not null, subsequent - * connection attempts will fail. - */ - protected ChannelFuture connectFuture; - protected ScheduledFuture connectTimeoutFuture; - private ConnectException connectTimeoutException; - - protected AbstractAsyncChannel(Channel parent, Integer id) { - super(parent, id); - } - - - @Override - public InetSocketAddress localAddress() { - if (ch == null) { - return null; - } - return (InetSocketAddress) super.localAddress(); - } - - @Override - public InetSocketAddress remoteAddress() { - if (ch == null) { - return null; - } - return (InetSocketAddress) super.remoteAddress(); - } - - protected AsynchronousChannel javaChannel() { - return ch; - } - - - @Override - public boolean isOpen() { - return ch == null || ch.isOpen(); - } - - @Override - protected void doDeregister() throws Exception { - // NOOP - } - - @Override - protected AsyncUnsafe newUnsafe() { - return new AsyncUnsafe(); - } - - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof AsyncChildEventLoop; - } - - protected class AsyncUnsafe extends AbstractUnsafe { - - @Override - public void connect(final SocketAddress remoteAddress, - final SocketAddress localAddress, final ChannelFuture future) { - if (eventLoop().inEventLoop()) { - if (!ensureOpen(future)) { - return; - } - - try { - if (connectFuture != null) { - throw new IllegalStateException("connection attempt already made"); - } - connectFuture = future; - - doConnect(remoteAddress, localAddress, future); - - // Schedule connect timeout. - int connectTimeoutMillis = config().getConnectTimeoutMillis(); - if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(new Runnable() { - @Override - public void run() { - if (connectTimeoutException == null) { - connectTimeoutException = new ConnectException("connection timed out"); - } - ChannelFuture connectFuture = AbstractAsyncChannel.this.connectFuture; - if (connectFuture != null && - connectFuture.setFailure(connectTimeoutException)) { - pipeline().fireExceptionCaught(connectTimeoutException); - close(voidFuture()); - } - } - }, connectTimeoutMillis, TimeUnit.MILLISECONDS); - } - - } catch (Throwable t) { - future.setFailure(t); - pipeline().fireExceptionCaught(t); - closeIfClosed(); - } - } else { - eventLoop().execute(new Runnable() { - @Override - public void run() { - connect(remoteAddress, localAddress, future); - } - }); - } - } - - protected final void connectFailed(Throwable t) { - connectFuture.setFailure(t); - pipeline().fireExceptionCaught(t); - closeIfClosed(); - } - - protected final void connectSuccess() { - assert eventLoop().inEventLoop(); - assert connectFuture != null; - try { - boolean wasActive = isActive(); - connectFuture.setSuccess(); - if (!wasActive && isActive()) { - pipeline().fireChannelActive(); - } - } catch (Throwable t) { - connectFuture.setFailure(t); - pipeline().fireExceptionCaught(t); - closeIfClosed(); - } finally { - connectTimeoutFuture.cancel(false); - connectFuture = null; - } - } - } - protected abstract void doConnect(SocketAddress remoteAddress, - SocketAddress localAddress, ChannelFuture connectFuture); - -} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AsyncChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/aio/AsyncChildEventLoop.java deleted file mode 100755 index aad21eb795..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/aio/AsyncChildEventLoop.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.aio; - -import io.netty.channel.SingleThreadEventLoop; - -import java.util.concurrent.ThreadFactory; - -final class AsyncChildEventLoop extends SingleThreadEventLoop { - - AsyncChildEventLoop(ThreadFactory threadFactory) { - super(threadFactory); - } - - @Override - protected void run() { - for (;;) { - Runnable task; - try { - task = takeTask(); - task.run(); - } catch (InterruptedException e) { - // Waken up by interruptThread() - } - - if (isShutdown() && peekTask() == null) { - break; - } - } - } - - @Override - protected void wakeup(boolean inEventLoop) { - if (!inEventLoop) { - interruptThread(); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AsyncEventLoop.java b/transport/src/main/java/io/netty/channel/socket/aio/AsyncEventLoop.java deleted file mode 100755 index 55dbba1f32..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/aio/AsyncEventLoop.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.aio; - -import io.netty.channel.EventExecutor; -import io.netty.channel.MultithreadEventLoop; - -import java.util.concurrent.ThreadFactory; - -public class AsyncEventLoop extends MultithreadEventLoop { - - public AsyncEventLoop() { - this(0); - } - - public AsyncEventLoop(int nThreads) { - this(nThreads, null); - } - - public AsyncEventLoop(int nThreads, ThreadFactory threadFactory) { - super(nThreads, threadFactory); - } - - @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - return new AsyncChildEventLoop(threadFactory); - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AsyncServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AsyncServerSocketChannel.java deleted file mode 100755 index a6ac81bf3a..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/aio/AsyncServerSocketChannel.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.aio; - -import io.netty.buffer.ChannelBufType; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ServerChannel; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.channels.AsynchronousChannelGroup; -import java.nio.channels.AsynchronousServerSocketChannel; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.CompletionHandler; - -public class AsyncServerSocketChannel extends AbstractAsyncChannel implements ServerChannel { - - private static final AcceptHandler ACCEPT_HANDLER = new AcceptHandler(); - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(AsyncServerSocketChannel.class); - private volatile AsyncServerSocketChannelConfig config; - - public AsyncServerSocketChannel() { - super(null, null); - } - - - @Override - protected AsynchronousServerSocketChannel javaChannel() { - return (AsynchronousServerSocketChannel) super.javaChannel(); - } - - @Override - public boolean isActive() { - AsynchronousServerSocketChannel channel = javaChannel(); - try { - if (channel != null && channel.getLocalAddress() != null) { - return true; - } - } catch (IOException e) { - return true; - } - return false; - } - - @Override - public ChannelBufType bufferType() { - return ChannelBufType.MESSAGE; - } - - @Override - protected SocketAddress localAddress0() { - try { - return javaChannel().getLocalAddress(); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - protected SocketAddress remoteAddress0() { - return null; - } - - @Override - protected void doBind(SocketAddress localAddress) throws Exception { - javaChannel().bind(localAddress); - javaChannel().accept(this, ACCEPT_HANDLER); - - } - - @Override - protected void doClose() throws Exception { - javaChannel().close(); - } - - @Override - protected boolean isFlushPending() { - return false; - } - - @Override - protected void doConnect( - SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) { - future.setFailure(new UnsupportedOperationException()); - } - - @Override - protected void doDisconnect() throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - protected Runnable doRegister() throws Exception { - ch = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); - config = new AsyncServerSocketChannelConfig(javaChannel()); - - return null; - } - - private static final class AcceptHandler - implements CompletionHandler { - public void completed(AsynchronousSocketChannel ch, AsyncServerSocketChannel channel) { - // register again this handler to accept new connections - channel.javaChannel().accept(channel, this); - - // create the socket add it to the buffer and fire the event - channel.pipeline().inboundMessageBuffer().add(new AsyncSocketChannel(channel, null, ch)); - channel.pipeline().fireInboundBufferUpdated(); - } - - public void failed(Throwable t, AsyncServerSocketChannel channel) { - logger.warn("Failed to create a new channel from an accepted socket.", t); - } - } - - @Override - public AsyncServerSocketChannelConfig config() { - if (config == null) { - throw new IllegalStateException("Channel not registered yet"); - } - return config; - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AsyncServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AsyncServerSocketChannelConfig.java deleted file mode 100755 index 58dfff1219..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/aio/AsyncServerSocketChannelConfig.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.aio; - -import static io.netty.channel.ChannelOption.*; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelOption; -import io.netty.channel.DefaultChannelConfig; -import io.netty.channel.socket.ServerSocketChannelConfig; - -import java.io.IOException; -import java.net.StandardSocketOptions; -import java.nio.channels.AsynchronousServerSocketChannel; -import java.util.Map; - -/** - * The Async {@link ServerSocketChannelConfig} implementation. - */ -public class AsyncServerSocketChannelConfig extends DefaultChannelConfig - implements ServerSocketChannelConfig { - - private final AsynchronousServerSocketChannel channel; - private volatile int backlog; - - /** - * Creates a new instance. - */ - public AsyncServerSocketChannelConfig(AsynchronousServerSocketChannel channel) { - if (channel == null) { - throw new NullPointerException("channel"); - } - this.channel = channel; - } - - @Override - public Map, Object> getOptions() { - return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); - } - - @Override - public T getOption(ChannelOption option) { - if (option == SO_RCVBUF) { - return (T) Integer.valueOf(getReceiveBufferSize()); - } - if (option == SO_REUSEADDR) { - return (T) Boolean.valueOf(isReuseAddress()); - } - if (option == SO_BACKLOG) { - return (T) Integer.valueOf(getBacklog()); - } - - return super.getOption(option); - } - - @Override - public boolean setOption(ChannelOption option, T value) { - validate(option, value); - - if (option == SO_RCVBUF) { - setReceiveBufferSize((Integer) value); - } else if (option == SO_REUSEADDR) { - setReuseAddress((Boolean) value); - } else if (option == SO_BACKLOG) { - setBacklog((Integer) value); - } else { - return super.setOption(option, value); - } - - return true; - } - - @Override - public boolean isReuseAddress() { - try { - return channel.getOption(StandardSocketOptions.SO_REUSEADDR); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setReuseAddress(boolean reuseAddress) { - try { - channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public int getReceiveBufferSize() { - try { - return channel.getOption(StandardSocketOptions.SO_RCVBUF); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setReceiveBufferSize(int receiveBufferSize) { - try { - channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { - throw new UnsupportedOperationException(); - } - - @Override - public int getBacklog() { - return backlog; - } - - @Override - public void setBacklog(int backlog) { - if (backlog < 0) { - throw new IllegalArgumentException("backlog: " + backlog); - } - this.backlog = backlog; - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AsyncSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AsyncSocketChannel.java deleted file mode 100755 index f561cd1486..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/aio/AsyncSocketChannel.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.aio; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ChannelBufType; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelStateHandler; -import io.netty.channel.ChannelStateHandlerAdapter; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousChannelGroup; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.CompletionHandler; -import java.util.concurrent.atomic.AtomicBoolean; - -public class AsyncSocketChannel extends AbstractAsyncChannel { - - private static final CompletionHandler CONNECT_HANDLER = new ConnectHandler(); - private static final CompletionHandler READ_HANDLER = new ReadHandler(); - private static final CompletionHandler WRITE_HANDLER = new WriteHandler(); - private static final ChannelStateHandler READ_START_HANDLER = new ChannelStateHandlerAdapter() { - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - try { - super.channelActive(ctx); - - // once the channel is active, the first read is scheduled - AsyncSocketChannel.read((AsyncSocketChannel)ctx.channel()); - - } finally { - ctx.pipeline().remove(this); - } - - - } - - }; - private final AtomicBoolean flushing = new AtomicBoolean(false); - private volatile AsyncSocketChannelConfig config; - - public AsyncSocketChannel() { - this(null, null, null); - } - - public AsyncSocketChannel(AsyncServerSocketChannel parent, Integer id, AsynchronousSocketChannel channel) { - super(parent, id); - this.ch = channel; - if (ch != null) { - config = new AsyncSocketChannelConfig(javaChannel()); - pipeline().addLast(READ_START_HANDLER); - } - } - - @Override - public boolean isActive() { - AsynchronousSocketChannel ch = javaChannel(); - return ch.isOpen() && remoteAddress() != null; - } - - @Override - protected AsynchronousSocketChannel javaChannel() { - return (AsynchronousSocketChannel) super.javaChannel(); - } - - @Override - public ChannelBufType bufferType() { - return ChannelBufType.BYTE; - } - - @Override - protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelFuture future) { - assert ch != null; - if (localAddress != null) { - try { - javaChannel().bind(localAddress); - } catch (IOException e) { - future.setFailure(e); - return; - } - } - - javaChannel().connect(remoteAddress, this, CONNECT_HANDLER); - } - - @Override - protected InetSocketAddress localAddress0() { - try { - return (InetSocketAddress) javaChannel().getLocalAddress(); - } catch (IOException e) { - return null; - } - } - - @Override - protected InetSocketAddress remoteAddress0() { - try { - return (InetSocketAddress) javaChannel().getRemoteAddress(); - } catch (IOException e) { - return null; - } - } - - @Override - protected Runnable doRegister() throws Exception { - if (ch == null) { - ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); - config = new AsyncSocketChannelConfig(javaChannel()); - pipeline().addLast(READ_START_HANDLER); - } - - - return null; - } - - /** - * Trigger a read from the {@link AsyncSocketChannel} - * - */ - private static void read(AsyncSocketChannel channel) { - ByteBuf byteBuf = channel.pipeline().inboundByteBuffer(); - expandReadBuffer(byteBuf); - - // Get a ByteBuffer view on the ByteBuf and clear it before try to read - ByteBuffer buffer = byteBuf.nioBuffer(); - buffer.clear(); - channel.javaChannel().read(buffer, channel, READ_HANDLER); - } - - - private static boolean expandReadBuffer(ByteBuf byteBuf) { - if (!byteBuf.writable()) { - // FIXME: Magic number - byteBuf.ensureWritableBytes(4096); - return true; - } - return false; - } - - @Override - protected void doBind(SocketAddress localAddress) throws Exception { - javaChannel().bind(localAddress); - } - - @Override - protected void doDisconnect() throws Exception { - doClose(); - } - - @Override - protected void doClose() throws Exception { - javaChannel().close(); - } - - @Override - protected boolean isFlushPending() { - return false; - } - - @Override - protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { - // Only one pending write can be scheduled at one time. Otherwise - // a PendingWriteException will be thrown. So use CAS to not run - // into this - if (flushing.compareAndSet(false, true)) { - ByteBuffer buffer = (ByteBuffer)buf.nioBuffer(); - javaChannel().write(buffer, this, WRITE_HANDLER); - } - return false; - } - - - private static final class WriteHandler implements CompletionHandler { - - @Override - public void completed(Integer result, AsyncSocketChannel channel) { - ByteBuf buf = channel.pipeline().outboundByteBuffer(); - - if (result > 0) { - if (result < buf.readableBytes()) { - // Update the readerIndex with the amount of read bytes - buf.readerIndex(buf.readerIndex() + result); - } else { - // not enough space in the buffer anymore so discard everything that - // was read already - buf.discardReadBytes(); - - } - channel.notifyFlushFutures(); - } - - // Allow to have the next write pending - channel.flushing.set(false); - } - - @Override - public void failed(Throwable cause, AsyncSocketChannel channel) { - ByteBuf buf = channel.pipeline().outboundByteBuffer(); - if (!buf.readable()) { - buf.discardReadBytes(); - } - - channel.notifyFlushFutures(cause); - channel.pipeline().fireExceptionCaught(cause); - if (cause instanceof IOException) { - channel.close(channel.unsafe().voidFuture()); - } - // Allow to have the next write pending - channel.flushing.set(false); - } - } - - private static final class ReadHandler implements CompletionHandler { - - @Override - public void completed(Integer result, AsyncSocketChannel channel) { - assert channel.eventLoop().inEventLoop(); - - final ChannelPipeline pipeline = channel.pipeline(); - boolean closed = false; - boolean read = false; - try { - - int localReadAmount = result.intValue(); - if (localReadAmount > 0) { - //Set the writerIndex of the buffer correctly to the - // current writerIndex + read amount of bytes. - // - // This is needed as the ByteBuffer and the ByteBuf does not share - // each others index - final ByteBuf byteBuf = pipeline.inboundByteBuffer(); - byteBuf.writerIndex(byteBuf.writerIndex() + result); - - read = true; - - } else if (localReadAmount < 0) { - closed = true; - } - - } catch (Throwable t) { - if (read) { - read = false; - pipeline.fireInboundBufferUpdated(); - } - pipeline.fireExceptionCaught(t); - if (t instanceof IOException) { - channel.close(channel.unsafe().voidFuture()); - } - } finally { - if (read) { - pipeline.fireInboundBufferUpdated(); - } - if (closed && channel.isOpen()) { - channel.close(channel.unsafe().voidFuture()); - } else { - // start the next read - AsyncSocketChannel.read(channel); - } - } - } - - @Override - public void failed(Throwable t, AsyncSocketChannel channel) { - channel.pipeline().fireExceptionCaught(t); - if (t instanceof IOException) { - channel.close(channel.unsafe().voidFuture()); - } else { - // start the next read - AsyncSocketChannel.read(channel); - } - } - } - - private static final class ConnectHandler implements CompletionHandler { - - @Override - public void completed(Void result, AsyncSocketChannel channel) { - ((AsyncUnsafe) channel.unsafe()).connectSuccess(); - - // start reading from channel - AsyncSocketChannel.read(channel); - } - - @Override - public void failed(Throwable exc, AsyncSocketChannel channel) { - ((AsyncUnsafe) channel.unsafe()).connectFailed(exc); - } - } - - @Override - public AsyncSocketChannelConfig config() { - if (config == null) { - throw new IllegalStateException("Channel not open yet"); - } - return config; - } - - -} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AsyncSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AsyncSocketChannelConfig.java deleted file mode 100755 index 481a606d63..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/aio/AsyncSocketChannelConfig.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.aio; - -import static io.netty.channel.ChannelOption.*; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelOption; -import io.netty.channel.DefaultChannelConfig; -import io.netty.channel.socket.SocketChannelConfig; - -import java.io.IOException; -import java.net.StandardSocketOptions; -import java.nio.channels.NetworkChannel; -import java.util.Map; - -/** - * The default {@link SocketChannelConfig} implementation. - */ -public class AsyncSocketChannelConfig extends DefaultChannelConfig - implements SocketChannelConfig { - - private final NetworkChannel channel; - - /** - * Creates a new instance. - */ - public AsyncSocketChannelConfig(NetworkChannel channel) { - if (channel == null) { - throw new NullPointerException("channel"); - } - this.channel = channel; - } - - @Override - public Map, Object> getOptions() { - return getOptions( - super.getOptions(), - SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS); - } - - @Override - public T getOption(ChannelOption option) { - if (option == SO_RCVBUF) { - return (T) Integer.valueOf(getReceiveBufferSize()); - } - if (option == SO_SNDBUF) { - return (T) Integer.valueOf(getSendBufferSize()); - } - if (option == TCP_NODELAY) { - return (T) Boolean.valueOf(isTcpNoDelay()); - } - if (option == SO_KEEPALIVE) { - return (T) Boolean.valueOf(isKeepAlive()); - } - if (option == SO_REUSEADDR) { - return (T) Boolean.valueOf(isReuseAddress()); - } - if (option == SO_LINGER) { - return (T) Integer.valueOf(getSoLinger()); - } - if (option == IP_TOS) { - return (T) Integer.valueOf(getTrafficClass()); - } - - return super.getOption(option); - } - - @Override - public boolean setOption(ChannelOption option, T value) { - validate(option, value); - - if (option == SO_RCVBUF) { - setReceiveBufferSize((Integer) value); - } else if (option == SO_SNDBUF) { - setSendBufferSize((Integer) value); - } else if (option == TCP_NODELAY) { - setTcpNoDelay((Boolean) value); - } else if (option == SO_KEEPALIVE) { - setKeepAlive((Boolean) value); - } else if (option == SO_REUSEADDR) { - setReuseAddress((Boolean) value); - } else if (option == SO_LINGER) { - setSoLinger((Integer) value); - } else if (option == IP_TOS) { - setTrafficClass((Integer) value); - } else { - return super.setOption(option, value); - } - - return true; - } - - @Override - public int getReceiveBufferSize() { - try { - return (int) channel.getOption(StandardSocketOptions.SO_RCVBUF); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public int getSendBufferSize() { - try { - return channel.getOption(StandardSocketOptions.SO_SNDBUF); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public int getSoLinger() { - try { - return channel.getOption(StandardSocketOptions.SO_LINGER); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public int getTrafficClass() { - try { - return channel.getOption(StandardSocketOptions.IP_TOS); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public boolean isKeepAlive() { - try { - return channel.getOption(StandardSocketOptions.SO_KEEPALIVE); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public boolean isReuseAddress() { - try { - return channel.getOption(StandardSocketOptions.SO_REUSEADDR); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public boolean isTcpNoDelay() { - try { - return channel.getOption(StandardSocketOptions.SO_REUSEADDR); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setKeepAlive(boolean keepAlive) { - try { - channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setPerformancePreferences( - int connectionTime, int latency, int bandwidth) { - throw new UnsupportedOperationException(); - } - - @Override - public void setReceiveBufferSize(int receiveBufferSize) { - try { - channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setReuseAddress(boolean reuseAddress) { - try { - channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setSendBufferSize(int sendBufferSize) { - try { - channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setSoLinger(int soLinger) { - try { - channel.setOption(StandardSocketOptions.SO_LINGER, soLinger); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setTcpNoDelay(boolean tcpNoDelay) { - try { - channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay); - } catch (IOException e) { - throw new ChannelException(e); - } - } - - @Override - public void setTrafficClass(int trafficClass) { - try { - channel.setOption(StandardSocketOptions.IP_TOS, trafficClass); - } catch (IOException e) { - throw new ChannelException(e); - } - } -}