From 4144b437355bbbc99f4021f734e9fd48553662fc Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 24 Feb 2011 17:26:18 +0900 Subject: [PATCH] NETTY-386 Support for serial connections using RXTX Contributed by Daniel Bimschas and Dennis Pfisterer Initial import after fixing compiler warnings and removing log messages and shut-down hooks --- pom.xml | 10 + .../jboss/netty/channel/rxtx/RXTXChannel.java | 80 +++++ .../netty/channel/rxtx/RXTXChannelConfig.java | 204 +++++++++++ .../channel/rxtx/RXTXChannelFactory.java | 60 ++++ .../netty/channel/rxtx/RXTXChannelSink.java | 335 ++++++++++++++++++ .../netty/channel/rxtx/RXTXDeviceAddress.java | 44 +++ 6 files changed, 733 insertions(+) create mode 100644 src/main/java/org/jboss/netty/channel/rxtx/RXTXChannel.java create mode 100644 src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelConfig.java create mode 100644 src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelFactory.java create mode 100644 src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelSink.java create mode 100644 src/main/java/org/jboss/netty/channel/rxtx/RXTXDeviceAddress.java diff --git a/pom.xml b/pom.xml index e9ea30709d..7a7b9ab0d2 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,16 @@ compile true + + + + + org.rxtx + rxtx + 2.1.7 + compile + true + diff --git a/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannel.java b/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannel.java new file mode 100644 index 0000000000..9510057900 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannel.java @@ -0,0 +1,80 @@ +/* + * Copyright 2011 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.rxtx; + + +import java.net.SocketAddress; + +import org.jboss.netty.channel.AbstractChannel; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelConfig; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelSink; + +/** + * A channel to a serial device using the RXTX library. + * + * @author Daniel Bimschas + * @author Dennis Pfisterer + */ +public class RXTXChannel extends AbstractChannel { + + RXTXChannel(final Channel parent, final ChannelFactory factory, final ChannelPipeline pipeline, + final ChannelSink sink) { + super(parent, factory, pipeline, sink); + } + + @Override + public ChannelConfig getConfig() { + return ((RXTXChannelSink) getPipeline().getSink()).getConfig(); + } + + @Override + public boolean isBound() { + return ((RXTXChannelSink) getPipeline().getSink()).isBound(); + } + + @Override + public boolean isConnected() { + return ((RXTXChannelSink) getPipeline().getSink()).isConnected(); + } + + @Override + public SocketAddress getLocalAddress() { + return null; + } + + @Override + public SocketAddress getRemoteAddress() { + return ((RXTXChannelSink) getPipeline().getSink()).getRemoteAddress(); + } + + @Override + public ChannelFuture bind(final SocketAddress localAddress) { + throw new UnsupportedOperationException(); + } + + @Override + public ChannelFuture unbind() { + throw new UnsupportedOperationException(); + } + + void doSetClosed() { + setClosed(); + } +} diff --git a/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelConfig.java b/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelConfig.java new file mode 100644 index 0000000000..d8bcadc200 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelConfig.java @@ -0,0 +1,204 @@ +/* + * Copyright 2011 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.rxtx; + +import gnu.io.SerialPort; + +import org.jboss.netty.channel.DefaultChannelConfig; +import org.jboss.netty.util.internal.ConversionUtil; + +/** + * A configuration class for RXTX device connections. + * + * @author Daniel Bimschas + * @author Dennis Pfisterer + */ +public class RXTXChannelConfig extends DefaultChannelConfig { + + public static enum Stopbits { + + STOPBITS_1(SerialPort.STOPBITS_1), + STOPBITS_2(SerialPort.STOPBITS_2), + STOPBITS_1_5(SerialPort.STOPBITS_1_5); + + private int value; + + private Stopbits(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static Stopbits ofValue(int value) { + for (Stopbits stopbit : Stopbits.values()) { + if (stopbit.value == value) { + return stopbit; + } + } + throw new IllegalArgumentException("Unknown value for Stopbits: " + value + "."); + } + } + + public static enum Databits { + + DATABITS_5(SerialPort.DATABITS_5), + DATABITS_6(SerialPort.DATABITS_6), + DATABITS_7(SerialPort.DATABITS_7), + DATABITS_8(SerialPort.DATABITS_8); + + private int value; + + private Databits(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static Databits ofValue(int value) { + for (Databits databit : Databits.values()) { + if (databit.value == value) { + return databit; + } + } + throw new IllegalArgumentException("Unknown value for Databits: " + value + "."); + } + } + + public static enum Paritybit { + + NONE(SerialPort.PARITY_NONE), + ODD(SerialPort.PARITY_ODD), + EVEN(SerialPort.PARITY_EVEN), + MARK(SerialPort.PARITY_MARK), + SPACE(SerialPort.PARITY_SPACE); + + private int value; + + private Paritybit(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static Paritybit ofValue(int value) { + for (Paritybit paritybit : Paritybit.values()) { + if (paritybit.value == value) { + return paritybit; + } + } + throw new IllegalArgumentException("Unknown value for paritybit: " + value + "."); + } + } + + private int baudrate = 115200; + + private boolean dtr = false; + + private boolean rts = false; + + private Stopbits stopbits = RXTXChannelConfig.Stopbits.STOPBITS_1; + + private Databits databits = RXTXChannelConfig.Databits.DATABITS_8; + + private Paritybit paritybit = RXTXChannelConfig.Paritybit.NONE; + + public RXTXChannelConfig() { + // work with defaults ... + } + + public RXTXChannelConfig(final int baudrate, final boolean dtr, final boolean rts, final Stopbits stopbits, + final Databits databits, final Paritybit paritybit) { + this.baudrate = baudrate; + this.dtr = dtr; + this.rts = rts; + this.stopbits = stopbits; + this.databits = databits; + this.paritybit = paritybit; + } + + @Override + public boolean setOption(final String key, final Object value) { + if (key.equals("baudrate")) { + setBaudrate(ConversionUtil.toInt(value)); + return true; + } else if (key.equals("stopbits")) { + setStopbits((Stopbits) value); + return true; + } else if (key.equals("databits")) { + setDatabits((Databits) value); + return true; + } else if (key.equals("paritybit")) { + setParitybit((Paritybit) value); + return true; + } else { + return super.setOption(key, value); + } + } + + public void setBaudrate(final int baudrate) { + this.baudrate = baudrate; + } + + public void setStopbits(final Stopbits stopbits) { + this.stopbits = stopbits; + } + + public void setDatabits(final Databits databits) { + this.databits = databits; + } + + private void setParitybit(final Paritybit paritybit) { + this.paritybit = paritybit; + } + + public int getBaudrate() { + return baudrate; + } + + public Stopbits getStopbits() { + return stopbits; + } + + public Databits getDatabits() { + return databits; + } + + public Paritybit getParitybit() { + return paritybit; + } + + public boolean isDtr() { + return dtr; + } + + public void setDtr(final boolean dtr) { + this.dtr = dtr; + } + + public boolean isRts() { + return rts; + } + + public void setRts(final boolean rts) { + this.rts = rts; + } +} diff --git a/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelFactory.java b/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelFactory.java new file mode 100644 index 0000000000..c622cb7460 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelFactory.java @@ -0,0 +1,60 @@ +/* + * Copyright 2011 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.rxtx; + + +import java.util.concurrent.ExecutorService; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.ChannelGroupFuture; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.util.internal.ExecutorUtil; + +/** + * A {@link ChannelFactory} for creating {@link RXTXChannel} instances. + * + * @author Daniel Bimschas + * @author Dennis Pfisterer + */ +public class RXTXChannelFactory implements ChannelFactory { + + private final ChannelGroup channels = new DefaultChannelGroup("RXTXChannelFactory-ChannelGroup"); + + private final ExecutorService executor; + + public RXTXChannelFactory(ExecutorService executor) { + this.executor = executor; + } + + @Override + public Channel newChannel(final ChannelPipeline pipeline) { + RXTXChannelSink sink = new RXTXChannelSink(executor); + RXTXChannel channel = new RXTXChannel(null, this, pipeline, sink); + sink.setChannel(channel); + channels.add(channel); + return channel; + } + + @Override + public void releaseExternalResources() { + ChannelGroupFuture close = channels.close(); + close.awaitUninterruptibly(); + ExecutorUtil.terminate(executor); + } +} diff --git a/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelSink.java b/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelSink.java new file mode 100644 index 0000000000..a2f503f63e --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/rxtx/RXTXChannelSink.java @@ -0,0 +1,335 @@ +/* + * Copyright 2011 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.rxtx; + +import gnu.io.CommPort; +import gnu.io.CommPortIdentifier; +import gnu.io.NoSuchPortException; +import gnu.io.PortInUseException; +import gnu.io.SerialPort; +import gnu.io.SerialPortEvent; +import gnu.io.SerialPortEventListener; +import gnu.io.UnsupportedCommOperationException; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.util.TooManyListenersException; +import java.util.concurrent.Executor; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.AbstractChannelSink; +import org.jboss.netty.channel.ChannelConfig; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelSink; +import org.jboss.netty.channel.ChannelState; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.DefaultChannelFuture; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.UpstreamMessageEvent; + +/** + * A {@link ChannelSink} implementation of the RXTX support for JBoss Netty. + * + * @author Daniel Bimschas + * @author Dennis Pfisterer + */ +public class RXTXChannelSink extends AbstractChannelSink { + + private static class WriteRunnable implements Runnable { + + private final DefaultChannelFuture future; + + private final RXTXChannelSink channelSink; + + private final ChannelBuffer message; + + public WriteRunnable(final DefaultChannelFuture future, final RXTXChannelSink channelSink, + final ChannelBuffer message) { + this.future = future; + this.channelSink = channelSink; + this.message = message; + } + + @Override + public void run() { + try { + + channelSink.outputStream.write(message.array(), message.readerIndex(), message.readableBytes()); + channelSink.outputStream.flush(); + future.setSuccess(); + + } catch (Exception e) { + future.setFailure(e); + } + } + } + + private static class ConnectRunnable implements Runnable { + + private final DefaultChannelFuture channelFuture; + + private final RXTXChannelSink channelSink; + + ConnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink) { + this.channelFuture = channelFuture; + this.channelSink = channelSink; + } + + @Override + public void run() { + + if (channelSink.closed) { + channelFuture.setFailure(new Exception("Channel is already closed.")); + } else { + try { + connectInternal(); + channelFuture.setSuccess(); + } catch (Exception e) { + channelFuture.setFailure(e); + } + } + + } + + private void connectInternal() + throws NoSuchPortException, PortInUseException, UnsupportedCommOperationException, IOException, + TooManyListenersException { + + final CommPort commPort; + try { + + final CommPortIdentifier cpi = + CommPortIdentifier.getPortIdentifier(channelSink.remoteAddress.getDeviceAddress()); + commPort = cpi.open(this.getClass().getName(), 1000); + + } catch (NoSuchPortException e) { + throw e; + } catch (PortInUseException e) { + throw e; + } + + channelSink.serialPort = (SerialPort) commPort; + channelSink.serialPort.addEventListener(new RXTXSerialPortEventListener(channelSink)); + channelSink.serialPort.notifyOnDataAvailable(true); + channelSink.serialPort.setSerialPortParams( + channelSink.config.getBaudrate(), + channelSink.config.getDatabits().getValue(), + channelSink.config.getStopbits().getValue(), + channelSink.config.getParitybit().getValue() + ); + + channelSink.serialPort.setDTR(channelSink.config.isDtr()); + channelSink.serialPort.setRTS(channelSink.config.isRts()); + + channelSink.outputStream = new BufferedOutputStream(channelSink.serialPort.getOutputStream()); + channelSink.inputStream = new BufferedInputStream(channelSink.serialPort.getInputStream()); + } + } + + private static class DisconnectRunnable implements Runnable { + + private final DefaultChannelFuture channelFuture; + + private final RXTXChannelSink channelSink; + + public DisconnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink) { + this.channelFuture = channelFuture; + this.channelSink = channelSink; + } + + @Override + public void run() { + if (channelSink.closed) { + channelFuture.setFailure(new Exception("Channel is already closed.")); + } else { + try { + disconnectInternal(); + channelSink.channel.doSetClosed(); + } catch (Exception e) { + channelFuture.setFailure(e); + } + } + } + + private void disconnectInternal() throws Exception { + + Exception exception = null; + + try { + if (channelSink.inputStream != null) { + channelSink.inputStream.close(); + } + } catch (IOException e) { + exception = e; + } + + try { + if (channelSink.outputStream != null) { + channelSink.outputStream.close(); + } + } catch (IOException e) { + exception = e; + } + + if (channelSink.serialPort != null) { + channelSink.serialPort.removeEventListener(); + channelSink.serialPort.close(); + } + + channelSink.inputStream = null; + channelSink.outputStream = null; + channelSink.serialPort = null; + + if (exception != null) { + throw exception; + } + } + } + + private final Executor executor; + + final RXTXChannelConfig config; + + RXTXChannel channel; + + public RXTXChannelSink(final Executor executor) { + this.executor = executor; + config = new RXTXChannelConfig(); + } + + public boolean isConnected() { + return inputStream != null && outputStream != null; + } + + public RXTXDeviceAddress getRemoteAddress() { + return remoteAddress; + } + + public boolean isBound() { + return false; + } + + public ChannelConfig getConfig() { + return config; + } + + public void setChannel(final RXTXChannel channel) { + this.channel = channel; + } + + private static class RXTXSerialPortEventListener implements SerialPortEventListener { + + private final RXTXChannelSink channelSink; + + public RXTXSerialPortEventListener(final RXTXChannelSink channelSink) { + this.channelSink = channelSink; + } + + @Override + public void serialEvent(final SerialPortEvent event) { + switch (event.getEventType()) { + case SerialPortEvent.DATA_AVAILABLE: + try { + if (channelSink.inputStream != null && channelSink.inputStream.available() > 0) { + int available = channelSink.inputStream.available(); + byte[] buffer = new byte[available]; + int read = channelSink.inputStream.read(buffer); + if (read > 0) { + ChannelBuffer channelBuffer = ChannelBuffers.wrappedBuffer(buffer, 0, read); + UpstreamMessageEvent upstreamMessageEvent = new UpstreamMessageEvent( + channelSink.channel, + channelBuffer, + channelSink.getRemoteAddress() + ); + channelSink.channel.getPipeline().sendUpstream(upstreamMessageEvent); + } + } + } catch (IOException e) { + Channels.fireExceptionCaught(channelSink.channel, e); + channelSink.channel.close(); + } + break; + } + } + } + + RXTXDeviceAddress remoteAddress; + + BufferedOutputStream outputStream; + + BufferedInputStream inputStream; + + SerialPort serialPort; + + volatile boolean closed = false; + + @Override + public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + + final ChannelFuture future = e.getFuture(); + + if (e instanceof ChannelStateEvent) { + + final ChannelStateEvent stateEvent = (ChannelStateEvent) e; + final ChannelState state = stateEvent.getState(); + final Object value = stateEvent.getValue(); + + switch (state) { + + case OPEN: + if (Boolean.FALSE.equals(value)) { + executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this)); + } + break; + + case BOUND: + throw new UnsupportedOperationException(); + + case CONNECTED: + if (value != null) { + remoteAddress = (RXTXDeviceAddress) value; + executor.execute(new ConnectRunnable((DefaultChannelFuture) future, this)); + } else { + executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this)); + } + break; + + case INTEREST_OPS: + throw new UnsupportedOperationException(); + + } + + } else if (e instanceof MessageEvent) { + + final MessageEvent event = (MessageEvent) e; + if (event.getMessage() instanceof ChannelBuffer) { + executor.execute( + new WriteRunnable((DefaultChannelFuture) future, this, (ChannelBuffer) event.getMessage()) + ); + } else { + throw new IllegalArgumentException( + "Only ChannelBuffer objects are supported to be written onto the RXTXChannelSink! " + + "Please check if the encoder pipeline is configured correctly." + ); + } + } + } +} diff --git a/src/main/java/org/jboss/netty/channel/rxtx/RXTXDeviceAddress.java b/src/main/java/org/jboss/netty/channel/rxtx/RXTXDeviceAddress.java new file mode 100644 index 0000000000..677059f939 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/rxtx/RXTXDeviceAddress.java @@ -0,0 +1,44 @@ +/* + * Copyright 2011 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.rxtx; + +import java.net.SocketAddress; + +/** + * A {@link SocketAddress} subclass to wrap the serial port address of a RXTX + * device (e.g. COM1, /dev/ttyUSB0). + * + * @author Daniel Bimschas + * @author Dennis Pfisterer + */ +public class RXTXDeviceAddress extends SocketAddress { + + private static final long serialVersionUID = -2907820090993709523L; + + private final String deviceAddress; + + /** + * + * @param deviceAddress the address of the device (e.g. COM1, /dev/ttyUSB0, ...) + */ + public RXTXDeviceAddress(String deviceAddress) { + this.deviceAddress = deviceAddress; + } + + public String getDeviceAddress() { + return deviceAddress; + } +}