Merge pull request #22 from pfisterer/master
UPDATED: Channel(Sink) implementation for connecting to InputStream and OutputStream objects
This commit is contained in:
commit
a017961168
48
src/main/java/org/jboss/netty/channel/iostream/IOStreamAddress.java
Executable file
48
src/main/java/org/jboss/netty/channel/iostream/IOStreamAddress.java
Executable file
@ -0,0 +1,48 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2011 Red Hat, Inc.
|
||||||
|
*
|
||||||
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.channel.iostream;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link java.net.SocketAddress} implementation holding an {@link java.io.InputStream} and an {@link java.io.OutputStream} instance used as
|
||||||
|
* "remote" address to connect to with a {@link IOStreamChannel}.
|
||||||
|
*
|
||||||
|
* @author Daniel Bimschas
|
||||||
|
* @author Dennis Pfisterer
|
||||||
|
*/
|
||||||
|
public class IOStreamAddress extends SocketAddress {
|
||||||
|
|
||||||
|
private final InputStream inputStream;
|
||||||
|
|
||||||
|
private final OutputStream outputStream;
|
||||||
|
|
||||||
|
public IOStreamAddress(final InputStream inputStream, final OutputStream outputStream) {
|
||||||
|
|
||||||
|
this.inputStream = inputStream;
|
||||||
|
this.outputStream = outputStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
public InputStream getInputStream() {
|
||||||
|
return inputStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
public OutputStream getOutputStream() {
|
||||||
|
return outputStream;
|
||||||
|
}
|
||||||
|
}
|
73
src/main/java/org/jboss/netty/channel/iostream/IOStreamChannel.java
Executable file
73
src/main/java/org/jboss/netty/channel/iostream/IOStreamChannel.java
Executable file
@ -0,0 +1,73 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2011 Red Hat, Inc.
|
||||||
|
*
|
||||||
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.channel.iostream;
|
||||||
|
|
||||||
|
|
||||||
|
import org.jboss.netty.channel.*;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A channel to an {@link java.io.InputStream} and an {@link java.io.OutputStream}.
|
||||||
|
*
|
||||||
|
* @author Daniel Bimschas
|
||||||
|
* @author Dennis Pfisterer
|
||||||
|
*/
|
||||||
|
public class IOStreamChannel extends AbstractChannel {
|
||||||
|
|
||||||
|
IOStreamChannel(final ChannelFactory factory, final ChannelPipeline pipeline, final ChannelSink sink) {
|
||||||
|
super(null, factory, pipeline, sink);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelConfig getConfig() {
|
||||||
|
return ((IOStreamChannelSink) getPipeline().getSink()).getConfig();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isBound() {
|
||||||
|
return ((IOStreamChannelSink) getPipeline().getSink()).isBound();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isConnected() {
|
||||||
|
return ((IOStreamChannelSink) getPipeline().getSink()).isConnected();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SocketAddress getLocalAddress() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SocketAddress getRemoteAddress() {
|
||||||
|
return ((IOStreamChannelSink) getPipeline().getSink()).getRemoteAddress();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture bind(final SocketAddress localAddress) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture unbind() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
void doSetClosed() {
|
||||||
|
setClosed();
|
||||||
|
}
|
||||||
|
}
|
60
src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelFactory.java
Executable file
60
src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelFactory.java
Executable file
@ -0,0 +1,60 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2011 Red Hat, Inc.
|
||||||
|
*
|
||||||
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.channel.iostream;
|
||||||
|
|
||||||
|
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
|
import org.jboss.netty.channel.ChannelFactory;
|
||||||
|
import org.jboss.netty.channel.ChannelPipeline;
|
||||||
|
import org.jboss.netty.channel.group.ChannelGroup;
|
||||||
|
import org.jboss.netty.channel.group.ChannelGroupFuture;
|
||||||
|
import org.jboss.netty.channel.group.DefaultChannelGroup;
|
||||||
|
import org.jboss.netty.util.internal.ExecutorUtil;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link org.jboss.netty.channel.ChannelFactory} for creating {@link IOStreamChannel} instances.
|
||||||
|
*
|
||||||
|
* @author Daniel Bimschas
|
||||||
|
* @author Dennis Pfisterer
|
||||||
|
*/
|
||||||
|
public class IOStreamChannelFactory implements ChannelFactory {
|
||||||
|
|
||||||
|
private final ChannelGroup channels = new DefaultChannelGroup("IOStreamChannelFactory-ChannelGroup");
|
||||||
|
|
||||||
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
|
public IOStreamChannelFactory(ExecutorService executorService) {
|
||||||
|
this.executorService = executorService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Channel newChannel(final ChannelPipeline pipeline) {
|
||||||
|
IOStreamChannelSink sink = new IOStreamChannelSink(executorService);
|
||||||
|
IOStreamChannel channel = new IOStreamChannel(this, pipeline, sink);
|
||||||
|
sink.setChannel(channel);
|
||||||
|
channels.add(channel);
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void releaseExternalResources() {
|
||||||
|
ChannelGroupFuture close = channels.close();
|
||||||
|
close.awaitUninterruptibly();
|
||||||
|
ExecutorUtil.terminate(executorService);
|
||||||
|
}
|
||||||
|
}
|
178
src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelSink.java
Executable file
178
src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelSink.java
Executable file
@ -0,0 +1,178 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2011 Red Hat, Inc.
|
||||||
|
*
|
||||||
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.channel.iostream;
|
||||||
|
|
||||||
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
|
import org.jboss.netty.buffer.ChannelBuffers;
|
||||||
|
import org.jboss.netty.channel.*;
|
||||||
|
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.PushbackInputStream;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
import static org.jboss.netty.channel.Channels.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link org.jboss.netty.channel.ChannelSink} implementation which reads from an {@link java.io.InputStream} and
|
||||||
|
* writes to an {@link java.io.OutputStream}.
|
||||||
|
*
|
||||||
|
* @author Daniel Bimschas
|
||||||
|
* @author Dennis Pfisterer
|
||||||
|
*/
|
||||||
|
public class IOStreamChannelSink extends AbstractChannelSink {
|
||||||
|
|
||||||
|
private static class ReadRunnable implements Runnable {
|
||||||
|
|
||||||
|
private final IOStreamChannelSink channelSink;
|
||||||
|
|
||||||
|
public ReadRunnable(final IOStreamChannelSink channelSink) {
|
||||||
|
this.channelSink = channelSink;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
|
||||||
|
PushbackInputStream in = channelSink.inputStream;
|
||||||
|
|
||||||
|
while (channelSink.channel.isOpen()) {
|
||||||
|
|
||||||
|
byte[] buf;
|
||||||
|
int readBytes;
|
||||||
|
try {
|
||||||
|
int bytesToRead = in.available();
|
||||||
|
if (bytesToRead > 0) {
|
||||||
|
buf = new byte[bytesToRead];
|
||||||
|
readBytes = in.read(buf);
|
||||||
|
} else {
|
||||||
|
// peek into the stream if it was closed (value=-1)
|
||||||
|
int b = in.read();
|
||||||
|
if (b < 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// push back the byte which was read too much
|
||||||
|
in.unread(b);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
if (!channelSink.channel.getCloseFuture().isDone()) {
|
||||||
|
fireExceptionCaught(channelSink.channel, t);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
fireMessageReceived(channelSink.channel, ChannelBuffers.wrappedBuffer(buf, 0, readBytes));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up.
|
||||||
|
close(channelSink.channel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
|
private IOStreamChannel channel;
|
||||||
|
|
||||||
|
public IOStreamChannelSink(final ExecutorService executorService) {
|
||||||
|
this.executorService = executorService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isConnected() {
|
||||||
|
return inputStream != null && outputStream != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IOStreamAddress getRemoteAddress() {
|
||||||
|
return remoteAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isBound() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ChannelConfig getConfig() {
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setChannel(final IOStreamChannel channel) {
|
||||||
|
this.channel = channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
private IOStreamAddress remoteAddress;
|
||||||
|
|
||||||
|
private OutputStream outputStream;
|
||||||
|
|
||||||
|
private PushbackInputStream inputStream;
|
||||||
|
|
||||||
|
private ChannelConfig config = new DefaultChannelConfig();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
|
||||||
|
|
||||||
|
final ChannelFuture future = e.getFuture();
|
||||||
|
|
||||||
|
if (e instanceof ChannelStateEvent) {
|
||||||
|
|
||||||
|
final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
|
||||||
|
final ChannelState state = stateEvent.getState();
|
||||||
|
final Object value = stateEvent.getValue();
|
||||||
|
|
||||||
|
switch (state) {
|
||||||
|
|
||||||
|
case OPEN:
|
||||||
|
if (Boolean.FALSE.equals(value)) {
|
||||||
|
outputStream = null;
|
||||||
|
inputStream = null;
|
||||||
|
((IOStreamChannel) e.getChannel()).doSetClosed();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case BOUND:
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
|
||||||
|
case CONNECTED:
|
||||||
|
if (value != null) {
|
||||||
|
remoteAddress = (IOStreamAddress) value;
|
||||||
|
outputStream = remoteAddress.getOutputStream();
|
||||||
|
inputStream = new PushbackInputStream(remoteAddress.getInputStream());
|
||||||
|
executorService.execute(new ReadRunnable(this));
|
||||||
|
future.setSuccess();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case INTEREST_OPS:
|
||||||
|
// TODO implement
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (e instanceof MessageEvent) {
|
||||||
|
|
||||||
|
final MessageEvent event = (MessageEvent) e;
|
||||||
|
if (event.getMessage() instanceof ChannelBuffer) {
|
||||||
|
|
||||||
|
final ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
|
||||||
|
buffer.readBytes(outputStream, buffer.readableBytes());
|
||||||
|
outputStream.flush();
|
||||||
|
future.setSuccess();
|
||||||
|
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Only ChannelBuffer objects are supported to be written onto the IOStreamChannelSink! "
|
||||||
|
+ "Please check if the encoder pipeline is configured correctly."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
94
src/main/java/org/jboss/netty/example/iostream/IOStream.java
Executable file
94
src/main/java/org/jboss/netty/example/iostream/IOStream.java
Executable file
@ -0,0 +1,94 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2011 Red Hat, Inc.
|
||||||
|
*
|
||||||
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.example.iostream;
|
||||||
|
|
||||||
|
import org.jboss.netty.bootstrap.ClientBootstrap;
|
||||||
|
import org.jboss.netty.channel.*;
|
||||||
|
import org.jboss.netty.channel.iostream.IOStreamAddress;
|
||||||
|
import org.jboss.netty.channel.iostream.IOStreamChannelFactory;
|
||||||
|
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
|
||||||
|
import org.jboss.netty.handler.codec.frame.Delimiters;
|
||||||
|
import org.jboss.netty.handler.codec.string.StringDecoder;
|
||||||
|
import org.jboss.netty.handler.codec.string.StringEncoder;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An example demonstrating the use of the {@link org.jboss.netty.channel.iostream.IOStreamChannel}.
|
||||||
|
*
|
||||||
|
* @author Daniel Bimschas
|
||||||
|
* @author Dennis Pfisterer
|
||||||
|
*/
|
||||||
|
public class IOStream {
|
||||||
|
|
||||||
|
private static volatile boolean running = true;
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
|
||||||
|
final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||||
|
final ClientBootstrap bootstrap = new ClientBootstrap(new IOStreamChannelFactory(executorService));
|
||||||
|
|
||||||
|
// Configure the event pipeline factory.
|
||||||
|
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
|
||||||
|
public ChannelPipeline getPipeline() throws Exception {
|
||||||
|
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
|
||||||
|
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
|
||||||
|
pipeline.addLast("decoder", new StringDecoder());
|
||||||
|
pipeline.addLast("encoder", new StringEncoder());
|
||||||
|
pipeline.addLast("loggingHandler", new SimpleChannelHandler() {
|
||||||
|
@Override
|
||||||
|
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
final String message = (String) e.getMessage();
|
||||||
|
synchronized (System.out) {
|
||||||
|
e.getChannel().write("Message received: " + message);
|
||||||
|
}
|
||||||
|
if ("exit".equals(message)) {
|
||||||
|
IOStream.running = false;
|
||||||
|
}
|
||||||
|
super.messageReceived(ctx, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Make a new connection.
|
||||||
|
ChannelFuture connectFuture = bootstrap.connect(new IOStreamAddress(System.in, System.out));
|
||||||
|
|
||||||
|
// Wait until the connection is made successfully.
|
||||||
|
Channel channel = connectFuture.awaitUninterruptibly().getChannel();
|
||||||
|
|
||||||
|
while (running) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the connection.
|
||||||
|
channel.close().awaitUninterruptibly();
|
||||||
|
|
||||||
|
// Shut down all thread pools to exit.
|
||||||
|
bootstrap.releaseExternalResources();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user