NETTY-386 Support for serial connections using RXTX

Contributed by Daniel Bimschas and Dennis Pfisterer
Initial import after fixing compiler warnings and removing log
messages and shut-down hooks
This commit is contained in:
Trustin Lee 2011-02-24 17:26:18 +09:00
parent 7ab5ec5f74
commit 4144b43735
6 changed files with 733 additions and 0 deletions

10
pom.xml
View File

@ -63,6 +63,16 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
<!-- RXTX - completely optional -->
<!-- Used for serial port communication transport -->
<dependency>
<groupId>org.rxtx</groupId>
<artifactId>rxtx</artifactId>
<version>2.1.7</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<!-- Servlet API - completely optional -->
<!-- Used for HTTP tunneling transport -->

View File

@ -0,0 +1,80 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.rxtx;
import java.net.SocketAddress;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
/**
* A channel to a serial device using the RXTX library.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class RXTXChannel extends AbstractChannel {
RXTXChannel(final Channel parent, final ChannelFactory factory, final ChannelPipeline pipeline,
final ChannelSink sink) {
super(parent, factory, pipeline, sink);
}
@Override
public ChannelConfig getConfig() {
return ((RXTXChannelSink) getPipeline().getSink()).getConfig();
}
@Override
public boolean isBound() {
return ((RXTXChannelSink) getPipeline().getSink()).isBound();
}
@Override
public boolean isConnected() {
return ((RXTXChannelSink) getPipeline().getSink()).isConnected();
}
@Override
public SocketAddress getLocalAddress() {
return null;
}
@Override
public SocketAddress getRemoteAddress() {
return ((RXTXChannelSink) getPipeline().getSink()).getRemoteAddress();
}
@Override
public ChannelFuture bind(final SocketAddress localAddress) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture unbind() {
throw new UnsupportedOperationException();
}
void doSetClosed() {
setClosed();
}
}

View File

@ -0,0 +1,204 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.rxtx;
import gnu.io.SerialPort;
import org.jboss.netty.channel.DefaultChannelConfig;
import org.jboss.netty.util.internal.ConversionUtil;
/**
* A configuration class for RXTX device connections.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class RXTXChannelConfig extends DefaultChannelConfig {
public static enum Stopbits {
STOPBITS_1(SerialPort.STOPBITS_1),
STOPBITS_2(SerialPort.STOPBITS_2),
STOPBITS_1_5(SerialPort.STOPBITS_1_5);
private int value;
private Stopbits(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public static Stopbits ofValue(int value) {
for (Stopbits stopbit : Stopbits.values()) {
if (stopbit.value == value) {
return stopbit;
}
}
throw new IllegalArgumentException("Unknown value for Stopbits: " + value + ".");
}
}
public static enum Databits {
DATABITS_5(SerialPort.DATABITS_5),
DATABITS_6(SerialPort.DATABITS_6),
DATABITS_7(SerialPort.DATABITS_7),
DATABITS_8(SerialPort.DATABITS_8);
private int value;
private Databits(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public static Databits ofValue(int value) {
for (Databits databit : Databits.values()) {
if (databit.value == value) {
return databit;
}
}
throw new IllegalArgumentException("Unknown value for Databits: " + value + ".");
}
}
public static enum Paritybit {
NONE(SerialPort.PARITY_NONE),
ODD(SerialPort.PARITY_ODD),
EVEN(SerialPort.PARITY_EVEN),
MARK(SerialPort.PARITY_MARK),
SPACE(SerialPort.PARITY_SPACE);
private int value;
private Paritybit(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public static Paritybit ofValue(int value) {
for (Paritybit paritybit : Paritybit.values()) {
if (paritybit.value == value) {
return paritybit;
}
}
throw new IllegalArgumentException("Unknown value for paritybit: " + value + ".");
}
}
private int baudrate = 115200;
private boolean dtr = false;
private boolean rts = false;
private Stopbits stopbits = RXTXChannelConfig.Stopbits.STOPBITS_1;
private Databits databits = RXTXChannelConfig.Databits.DATABITS_8;
private Paritybit paritybit = RXTXChannelConfig.Paritybit.NONE;
public RXTXChannelConfig() {
// work with defaults ...
}
public RXTXChannelConfig(final int baudrate, final boolean dtr, final boolean rts, final Stopbits stopbits,
final Databits databits, final Paritybit paritybit) {
this.baudrate = baudrate;
this.dtr = dtr;
this.rts = rts;
this.stopbits = stopbits;
this.databits = databits;
this.paritybit = paritybit;
}
@Override
public boolean setOption(final String key, final Object value) {
if (key.equals("baudrate")) {
setBaudrate(ConversionUtil.toInt(value));
return true;
} else if (key.equals("stopbits")) {
setStopbits((Stopbits) value);
return true;
} else if (key.equals("databits")) {
setDatabits((Databits) value);
return true;
} else if (key.equals("paritybit")) {
setParitybit((Paritybit) value);
return true;
} else {
return super.setOption(key, value);
}
}
public void setBaudrate(final int baudrate) {
this.baudrate = baudrate;
}
public void setStopbits(final Stopbits stopbits) {
this.stopbits = stopbits;
}
public void setDatabits(final Databits databits) {
this.databits = databits;
}
private void setParitybit(final Paritybit paritybit) {
this.paritybit = paritybit;
}
public int getBaudrate() {
return baudrate;
}
public Stopbits getStopbits() {
return stopbits;
}
public Databits getDatabits() {
return databits;
}
public Paritybit getParitybit() {
return paritybit;
}
public boolean isDtr() {
return dtr;
}
public void setDtr(final boolean dtr) {
this.dtr = dtr;
}
public boolean isRts() {
return rts;
}
public void setRts(final boolean rts) {
this.rts = rts;
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.rxtx;
import java.util.concurrent.ExecutorService;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.util.internal.ExecutorUtil;
/**
* A {@link ChannelFactory} for creating {@link RXTXChannel} instances.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class RXTXChannelFactory implements ChannelFactory {
private final ChannelGroup channels = new DefaultChannelGroup("RXTXChannelFactory-ChannelGroup");
private final ExecutorService executor;
public RXTXChannelFactory(ExecutorService executor) {
this.executor = executor;
}
@Override
public Channel newChannel(final ChannelPipeline pipeline) {
RXTXChannelSink sink = new RXTXChannelSink(executor);
RXTXChannel channel = new RXTXChannel(null, this, pipeline, sink);
sink.setChannel(channel);
channels.add(channel);
return channel;
}
@Override
public void releaseExternalResources() {
ChannelGroupFuture close = channels.close();
close.awaitUninterruptibly();
ExecutorUtil.terminate(executor);
}
}

View File

@ -0,0 +1,335 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.rxtx;
import gnu.io.CommPort;
import gnu.io.CommPortIdentifier;
import gnu.io.NoSuchPortException;
import gnu.io.PortInUseException;
import gnu.io.SerialPort;
import gnu.io.SerialPortEvent;
import gnu.io.SerialPortEventListener;
import gnu.io.UnsupportedCommOperationException;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.util.TooManyListenersException;
import java.util.concurrent.Executor;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.DefaultChannelFuture;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.UpstreamMessageEvent;
/**
* A {@link ChannelSink} implementation of the RXTX support for JBoss Netty.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class RXTXChannelSink extends AbstractChannelSink {
private static class WriteRunnable implements Runnable {
private final DefaultChannelFuture future;
private final RXTXChannelSink channelSink;
private final ChannelBuffer message;
public WriteRunnable(final DefaultChannelFuture future, final RXTXChannelSink channelSink,
final ChannelBuffer message) {
this.future = future;
this.channelSink = channelSink;
this.message = message;
}
@Override
public void run() {
try {
channelSink.outputStream.write(message.array(), message.readerIndex(), message.readableBytes());
channelSink.outputStream.flush();
future.setSuccess();
} catch (Exception e) {
future.setFailure(e);
}
}
}
private static class ConnectRunnable implements Runnable {
private final DefaultChannelFuture channelFuture;
private final RXTXChannelSink channelSink;
ConnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink) {
this.channelFuture = channelFuture;
this.channelSink = channelSink;
}
@Override
public void run() {
if (channelSink.closed) {
channelFuture.setFailure(new Exception("Channel is already closed."));
} else {
try {
connectInternal();
channelFuture.setSuccess();
} catch (Exception e) {
channelFuture.setFailure(e);
}
}
}
private void connectInternal()
throws NoSuchPortException, PortInUseException, UnsupportedCommOperationException, IOException,
TooManyListenersException {
final CommPort commPort;
try {
final CommPortIdentifier cpi =
CommPortIdentifier.getPortIdentifier(channelSink.remoteAddress.getDeviceAddress());
commPort = cpi.open(this.getClass().getName(), 1000);
} catch (NoSuchPortException e) {
throw e;
} catch (PortInUseException e) {
throw e;
}
channelSink.serialPort = (SerialPort) commPort;
channelSink.serialPort.addEventListener(new RXTXSerialPortEventListener(channelSink));
channelSink.serialPort.notifyOnDataAvailable(true);
channelSink.serialPort.setSerialPortParams(
channelSink.config.getBaudrate(),
channelSink.config.getDatabits().getValue(),
channelSink.config.getStopbits().getValue(),
channelSink.config.getParitybit().getValue()
);
channelSink.serialPort.setDTR(channelSink.config.isDtr());
channelSink.serialPort.setRTS(channelSink.config.isRts());
channelSink.outputStream = new BufferedOutputStream(channelSink.serialPort.getOutputStream());
channelSink.inputStream = new BufferedInputStream(channelSink.serialPort.getInputStream());
}
}
private static class DisconnectRunnable implements Runnable {
private final DefaultChannelFuture channelFuture;
private final RXTXChannelSink channelSink;
public DisconnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink) {
this.channelFuture = channelFuture;
this.channelSink = channelSink;
}
@Override
public void run() {
if (channelSink.closed) {
channelFuture.setFailure(new Exception("Channel is already closed."));
} else {
try {
disconnectInternal();
channelSink.channel.doSetClosed();
} catch (Exception e) {
channelFuture.setFailure(e);
}
}
}
private void disconnectInternal() throws Exception {
Exception exception = null;
try {
if (channelSink.inputStream != null) {
channelSink.inputStream.close();
}
} catch (IOException e) {
exception = e;
}
try {
if (channelSink.outputStream != null) {
channelSink.outputStream.close();
}
} catch (IOException e) {
exception = e;
}
if (channelSink.serialPort != null) {
channelSink.serialPort.removeEventListener();
channelSink.serialPort.close();
}
channelSink.inputStream = null;
channelSink.outputStream = null;
channelSink.serialPort = null;
if (exception != null) {
throw exception;
}
}
}
private final Executor executor;
final RXTXChannelConfig config;
RXTXChannel channel;
public RXTXChannelSink(final Executor executor) {
this.executor = executor;
config = new RXTXChannelConfig();
}
public boolean isConnected() {
return inputStream != null && outputStream != null;
}
public RXTXDeviceAddress getRemoteAddress() {
return remoteAddress;
}
public boolean isBound() {
return false;
}
public ChannelConfig getConfig() {
return config;
}
public void setChannel(final RXTXChannel channel) {
this.channel = channel;
}
private static class RXTXSerialPortEventListener implements SerialPortEventListener {
private final RXTXChannelSink channelSink;
public RXTXSerialPortEventListener(final RXTXChannelSink channelSink) {
this.channelSink = channelSink;
}
@Override
public void serialEvent(final SerialPortEvent event) {
switch (event.getEventType()) {
case SerialPortEvent.DATA_AVAILABLE:
try {
if (channelSink.inputStream != null && channelSink.inputStream.available() > 0) {
int available = channelSink.inputStream.available();
byte[] buffer = new byte[available];
int read = channelSink.inputStream.read(buffer);
if (read > 0) {
ChannelBuffer channelBuffer = ChannelBuffers.wrappedBuffer(buffer, 0, read);
UpstreamMessageEvent upstreamMessageEvent = new UpstreamMessageEvent(
channelSink.channel,
channelBuffer,
channelSink.getRemoteAddress()
);
channelSink.channel.getPipeline().sendUpstream(upstreamMessageEvent);
}
}
} catch (IOException e) {
Channels.fireExceptionCaught(channelSink.channel, e);
channelSink.channel.close();
}
break;
}
}
}
RXTXDeviceAddress remoteAddress;
BufferedOutputStream outputStream;
BufferedInputStream inputStream;
SerialPort serialPort;
volatile boolean closed = false;
@Override
public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
final ChannelFuture future = e.getFuture();
if (e instanceof ChannelStateEvent) {
final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
final ChannelState state = stateEvent.getState();
final Object value = stateEvent.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this));
}
break;
case BOUND:
throw new UnsupportedOperationException();
case CONNECTED:
if (value != null) {
remoteAddress = (RXTXDeviceAddress) value;
executor.execute(new ConnectRunnable((DefaultChannelFuture) future, this));
} else {
executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this));
}
break;
case INTEREST_OPS:
throw new UnsupportedOperationException();
}
} else if (e instanceof MessageEvent) {
final MessageEvent event = (MessageEvent) e;
if (event.getMessage() instanceof ChannelBuffer) {
executor.execute(
new WriteRunnable((DefaultChannelFuture) future, this, (ChannelBuffer) event.getMessage())
);
} else {
throw new IllegalArgumentException(
"Only ChannelBuffer objects are supported to be written onto the RXTXChannelSink! "
+ "Please check if the encoder pipeline is configured correctly."
);
}
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.rxtx;
import java.net.SocketAddress;
/**
* A {@link SocketAddress} subclass to wrap the serial port address of a RXTX
* device (e.g. COM1, /dev/ttyUSB0).
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class RXTXDeviceAddress extends SocketAddress {
private static final long serialVersionUID = -2907820090993709523L;
private final String deviceAddress;
/**
*
* @param deviceAddress the address of the device (e.g. COM1, /dev/ttyUSB0, ...)
*/
public RXTXDeviceAddress(String deviceAddress) {
this.deviceAddress = deviceAddress;
}
public String getDeviceAddress() {
return deviceAddress;
}
}