From 07803b3e2c463b9ad5c1b82c747cb04e7bffb4c0 Mon Sep 17 00:00:00 2001 From: pfisterer Date: Mon, 1 Aug 2011 10:07:34 +0200 Subject: [PATCH] Updated version of netty-iostream channels --- .../channel/iostream/IOStreamAddress.java | 48 +++++ .../channel/iostream/IOStreamChannel.java | 73 +++++++ .../iostream/IOStreamChannelFactory.java | 60 ++++++ .../channel/iostream/IOStreamChannelSink.java | 178 ++++++++++++++++++ .../netty/example/iostream/IOStream.java | 94 +++++++++ 5 files changed, 453 insertions(+) create mode 100755 src/main/java/org/jboss/netty/channel/iostream/IOStreamAddress.java create mode 100755 src/main/java/org/jboss/netty/channel/iostream/IOStreamChannel.java create mode 100755 src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelFactory.java create mode 100755 src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelSink.java create mode 100755 src/main/java/org/jboss/netty/example/iostream/IOStream.java diff --git a/src/main/java/org/jboss/netty/channel/iostream/IOStreamAddress.java b/src/main/java/org/jboss/netty/channel/iostream/IOStreamAddress.java new file mode 100755 index 0000000000..e85c90e939 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/iostream/IOStreamAddress.java @@ -0,0 +1,48 @@ +/* + * Copyright 2011 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.iostream; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.SocketAddress; + +/** + * A {@link java.net.SocketAddress} implementation holding an {@link java.io.InputStream} and an {@link java.io.OutputStream} instance used as + * "remote" address to connect to with a {@link IOStreamChannel}. + * + * @author Daniel Bimschas + * @author Dennis Pfisterer + */ +public class IOStreamAddress extends SocketAddress { + + private final InputStream inputStream; + + private final OutputStream outputStream; + + public IOStreamAddress(final InputStream inputStream, final OutputStream outputStream) { + + this.inputStream = inputStream; + this.outputStream = outputStream; + } + + public InputStream getInputStream() { + return inputStream; + } + + public OutputStream getOutputStream() { + return outputStream; + } +} diff --git a/src/main/java/org/jboss/netty/channel/iostream/IOStreamChannel.java b/src/main/java/org/jboss/netty/channel/iostream/IOStreamChannel.java new file mode 100755 index 0000000000..67bb4b7a97 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/iostream/IOStreamChannel.java @@ -0,0 +1,73 @@ +/* + * Copyright 2011 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.iostream; + + +import org.jboss.netty.channel.*; + +import java.net.SocketAddress; + +/** + * A channel to an {@link java.io.InputStream} and an {@link java.io.OutputStream}. + * + * @author Daniel Bimschas + * @author Dennis Pfisterer + */ +public class IOStreamChannel extends AbstractChannel { + + IOStreamChannel(final ChannelFactory factory, final ChannelPipeline pipeline, final ChannelSink sink) { + super(null, factory, pipeline, sink); + } + + @Override + public ChannelConfig getConfig() { + return ((IOStreamChannelSink) getPipeline().getSink()).getConfig(); + } + + @Override + public boolean isBound() { + return ((IOStreamChannelSink) getPipeline().getSink()).isBound(); + } + + @Override + public boolean isConnected() { + return ((IOStreamChannelSink) getPipeline().getSink()).isConnected(); + } + + @Override + public SocketAddress getLocalAddress() { + return null; + } + + @Override + public SocketAddress getRemoteAddress() { + return ((IOStreamChannelSink) getPipeline().getSink()).getRemoteAddress(); + } + + @Override + public ChannelFuture bind(final SocketAddress localAddress) { + throw new UnsupportedOperationException(); + } + + @Override + public ChannelFuture unbind() { + throw new UnsupportedOperationException(); + } + + void doSetClosed() { + setClosed(); + } +} diff --git a/src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelFactory.java b/src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelFactory.java new file mode 100755 index 0000000000..5b7c9548cd --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelFactory.java @@ -0,0 +1,60 @@ +/* + * Copyright 2011 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.iostream; + + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.ChannelGroupFuture; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.util.internal.ExecutorUtil; + +import java.util.concurrent.ExecutorService; + +/** + * A {@link org.jboss.netty.channel.ChannelFactory} for creating {@link IOStreamChannel} instances. + * + * @author Daniel Bimschas + * @author Dennis Pfisterer + */ +public class IOStreamChannelFactory implements ChannelFactory { + + private final ChannelGroup channels = new DefaultChannelGroup("IOStreamChannelFactory-ChannelGroup"); + + private final ExecutorService executorService; + + public IOStreamChannelFactory(ExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public Channel newChannel(final ChannelPipeline pipeline) { + IOStreamChannelSink sink = new IOStreamChannelSink(executorService); + IOStreamChannel channel = new IOStreamChannel(this, pipeline, sink); + sink.setChannel(channel); + channels.add(channel); + return channel; + } + + @Override + public void releaseExternalResources() { + ChannelGroupFuture close = channels.close(); + close.awaitUninterruptibly(); + ExecutorUtil.terminate(executorService); + } +} diff --git a/src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelSink.java b/src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelSink.java new file mode 100755 index 0000000000..ecd14eb1d9 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelSink.java @@ -0,0 +1,178 @@ +/* + * Copyright 2011 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.iostream; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.*; + +import java.io.OutputStream; +import java.io.PushbackInputStream; +import java.util.concurrent.ExecutorService; + +import static org.jboss.netty.channel.Channels.*; + +/** + * A {@link org.jboss.netty.channel.ChannelSink} implementation which reads from an {@link java.io.InputStream} and + * writes to an {@link java.io.OutputStream}. + * + * @author Daniel Bimschas + * @author Dennis Pfisterer + */ +public class IOStreamChannelSink extends AbstractChannelSink { + + private static class ReadRunnable implements Runnable { + + private final IOStreamChannelSink channelSink; + + public ReadRunnable(final IOStreamChannelSink channelSink) { + this.channelSink = channelSink; + } + + @Override + public void run() { + + PushbackInputStream in = channelSink.inputStream; + + while (channelSink.channel.isOpen()) { + + byte[] buf; + int readBytes; + try { + int bytesToRead = in.available(); + if (bytesToRead > 0) { + buf = new byte[bytesToRead]; + readBytes = in.read(buf); + } else { + // peek into the stream if it was closed (value=-1) + int b = in.read(); + if (b < 0) { + break; + } + // push back the byte which was read too much + in.unread(b); + continue; + } + } catch (Throwable t) { + if (!channelSink.channel.getCloseFuture().isDone()) { + fireExceptionCaught(channelSink.channel, t); + } + break; + } + + fireMessageReceived(channelSink.channel, ChannelBuffers.wrappedBuffer(buf, 0, readBytes)); + } + + // Clean up. + close(channelSink.channel); + } + } + + private final ExecutorService executorService; + + private IOStreamChannel channel; + + public IOStreamChannelSink(final ExecutorService executorService) { + this.executorService = executorService; + } + + public boolean isConnected() { + return inputStream != null && outputStream != null; + } + + public IOStreamAddress getRemoteAddress() { + return remoteAddress; + } + + public boolean isBound() { + return false; + } + + public ChannelConfig getConfig() { + return config; + } + + public void setChannel(final IOStreamChannel channel) { + this.channel = channel; + } + + private IOStreamAddress remoteAddress; + + private OutputStream outputStream; + + private PushbackInputStream inputStream; + + private ChannelConfig config = new DefaultChannelConfig(); + + @Override + public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + + final ChannelFuture future = e.getFuture(); + + if (e instanceof ChannelStateEvent) { + + final ChannelStateEvent stateEvent = (ChannelStateEvent) e; + final ChannelState state = stateEvent.getState(); + final Object value = stateEvent.getValue(); + + switch (state) { + + case OPEN: + if (Boolean.FALSE.equals(value)) { + outputStream = null; + inputStream = null; + ((IOStreamChannel) e.getChannel()).doSetClosed(); + } + break; + + case BOUND: + throw new UnsupportedOperationException(); + + case CONNECTED: + if (value != null) { + remoteAddress = (IOStreamAddress) value; + outputStream = remoteAddress.getOutputStream(); + inputStream = new PushbackInputStream(remoteAddress.getInputStream()); + executorService.execute(new ReadRunnable(this)); + future.setSuccess(); + } + break; + + case INTEREST_OPS: + // TODO implement + throw new UnsupportedOperationException(); + + } + + } else if (e instanceof MessageEvent) { + + final MessageEvent event = (MessageEvent) e; + if (event.getMessage() instanceof ChannelBuffer) { + + final ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); + buffer.readBytes(outputStream, buffer.readableBytes()); + outputStream.flush(); + future.setSuccess(); + + } else { + throw new IllegalArgumentException( + "Only ChannelBuffer objects are supported to be written onto the IOStreamChannelSink! " + + "Please check if the encoder pipeline is configured correctly." + ); + } + } + } +} diff --git a/src/main/java/org/jboss/netty/example/iostream/IOStream.java b/src/main/java/org/jboss/netty/example/iostream/IOStream.java new file mode 100755 index 0000000000..1abc192d2c --- /dev/null +++ b/src/main/java/org/jboss/netty/example/iostream/IOStream.java @@ -0,0 +1,94 @@ +/* + * Copyright 2011 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.example.iostream; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.*; +import org.jboss.netty.channel.iostream.IOStreamAddress; +import org.jboss.netty.channel.iostream.IOStreamChannelFactory; +import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; +import org.jboss.netty.handler.codec.frame.Delimiters; +import org.jboss.netty.handler.codec.string.StringDecoder; +import org.jboss.netty.handler.codec.string.StringEncoder; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * An example demonstrating the use of the {@link org.jboss.netty.channel.iostream.IOStreamChannel}. + * + * @author Daniel Bimschas + * @author Dennis Pfisterer + */ +public class IOStream { + + private static volatile boolean running = true; + + public static void main(String[] args) { + + final ExecutorService executorService = Executors.newCachedThreadPool(); + final ClientBootstrap bootstrap = new ClientBootstrap(new IOStreamChannelFactory(executorService)); + + // Configure the event pipeline factory. + bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + public ChannelPipeline getPipeline() throws Exception { + DefaultChannelPipeline pipeline = new DefaultChannelPipeline(); + pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); + pipeline.addLast("decoder", new StringDecoder()); + pipeline.addLast("encoder", new StringEncoder()); + pipeline.addLast("loggingHandler", new SimpleChannelHandler() { + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) + throws Exception { + + final String message = (String) e.getMessage(); + synchronized (System.out) { + e.getChannel().write("Message received: " + message); + } + if ("exit".equals(message)) { + IOStream.running = false; + } + super.messageReceived(ctx, e); + } + } + ); + return pipeline; + } + }); + + // Make a new connection. + ChannelFuture connectFuture = bootstrap.connect(new IOStreamAddress(System.in, System.out)); + + // Wait until the connection is made successfully. + Channel channel = connectFuture.awaitUninterruptibly().getChannel(); + + while (running) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + // Close the connection. + channel.close().awaitUninterruptibly(); + + // Shut down all thread pools to exit. + bootstrap.releaseExternalResources(); + + } + +}