diff --git a/example/pom.xml b/example/pom.xml index 24a516c097..33aa58823a 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -98,6 +98,11 @@ netty-codec-stomp ${project.version} + + ${project.groupId} + netty-codec-mqtt + ${project.version} + com.google.protobuf diff --git a/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatBroker.java b/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatBroker.java new file mode 100644 index 0000000000..47a16adaef --- /dev/null +++ b/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatBroker.java @@ -0,0 +1,64 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.mqtt.heartBeat; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.mqtt.MqttDecoder; +import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.timeout.IdleStateHandler; + +import java.util.concurrent.TimeUnit; + +public final class MqttHeartBeatBroker { + + private MqttHeartBeatBroker() { + } + + public static void main(String[] args) throws Exception { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup); + b.option(ChannelOption.SO_BACKLOG, 1024); + b.channel(NioServerSocketChannel.class); + b.childHandler(new ChannelInitializer() { + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE); + ch.pipeline().addLast("decoder", new MqttDecoder()); + ch.pipeline().addLast("heartBeatHandler", new IdleStateHandler(45, 0, 0, TimeUnit.SECONDS)); + ch.pipeline().addLast("handler", MqttHeartBeatBrokerHandler.INSTANCE); + } + }); + + ChannelFuture f = b.bind(1883).sync(); + System.out.println("Broker initiated..."); + + f.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } +} diff --git a/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatBrokerHandler.java b/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatBrokerHandler.java new file mode 100644 index 0000000000..5505fc29a5 --- /dev/null +++ b/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatBrokerHandler.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.mqtt.heartBeat; + +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.mqtt.MqttConnAckMessage; +import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.ReferenceCountUtil; + +@Sharable +public final class MqttHeartBeatBrokerHandler extends ChannelInboundHandlerAdapter { + + public static final MqttHeartBeatBrokerHandler INSTANCE = new MqttHeartBeatBrokerHandler(); + + private MqttHeartBeatBrokerHandler() { + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + MqttMessage mqttMessage = (MqttMessage) msg; + System.out.println("Received MQTT message: " + mqttMessage); + switch (mqttMessage.fixedHeader().messageType()) { + case CONNECT: + MqttFixedHeader connackFixedHeader = + new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttConnAckVariableHeader mqttConnAckVariableHeader = + new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false); + MqttConnAckMessage connack = new MqttConnAckMessage(connackFixedHeader, mqttConnAckVariableHeader); + ctx.writeAndFlush(connack); + break; + case PINGREQ: + MqttFixedHeader pingreqFixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, + MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessage pingResp = new MqttMessage(pingreqFixedHeader); + ctx.writeAndFlush(pingResp); + break; + case DISCONNECT: + ctx.close(); + break; + default: + System.out.println("Unexpected message type: " + mqttMessage.fixedHeader().messageType()); + ReferenceCountUtil.release(msg); + ctx.close(); + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + System.out.println("Channel heartBeat lost"); + if (evt instanceof IdleStateEvent && IdleState.READER_IDLE == ((IdleStateEvent) evt).state()) { + ctx.close(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClient.java b/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClient.java new file mode 100644 index 0000000000..b9729c78f0 --- /dev/null +++ b/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClient.java @@ -0,0 +1,64 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.mqtt.heartBeat; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.mqtt.MqttDecoder; +import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.timeout.IdleStateHandler; + +import java.util.concurrent.TimeUnit; + +public final class MqttHeartBeatClient { + private MqttHeartBeatClient() { + } + + private static final String HOST = System.getProperty("host", "127.0.0.1"); + private static final int PORT = Integer.parseInt(System.getProperty("port", "1883")); + private static final String CLIENT_ID = System.getProperty("clientId", "guestClient"); + private static final String USER_NAME = System.getProperty("userName", "guest"); + private static final String PASSWORD = System.getProperty("password", "guest"); + + public static void main(String[] args) throws Exception { + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + Bootstrap b = new Bootstrap(); + b.group(workerGroup); + b.channel(NioSocketChannel.class); + b.handler(new ChannelInitializer() { + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE); + ch.pipeline().addLast("decoder", new MqttDecoder()); + ch.pipeline().addLast("heartBeatHandler", new IdleStateHandler(0, 20, 0, TimeUnit.SECONDS)); + ch.pipeline().addLast("handler", new MqttHeartBeatClientHandler(CLIENT_ID, USER_NAME, PASSWORD)); + } + }); + + ChannelFuture f = b.connect(HOST, PORT).sync(); + System.out.println("Client connected"); + f.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + } + } +} diff --git a/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClientHandler.java b/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClientHandler.java new file mode 100644 index 0000000000..d04ecf1084 --- /dev/null +++ b/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClientHandler.java @@ -0,0 +1,83 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.mqtt.heartBeat; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.ReferenceCountUtil; + +public class MqttHeartBeatClientHandler extends ChannelInboundHandlerAdapter { + + private static final String PROTOCOL_NAME_MQTT_3_1_1 = "MQTT"; + private static final int PROTOCOL_VERSION_MQTT_3_1_1 = 4; + + private final String clientId; + private final String userName; + private final byte[] password; + + public MqttHeartBeatClientHandler(String clientId, String userName, String password) { + this.clientId = clientId; + this.userName = userName; + this.password = password.getBytes(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + // discard all messages + ReferenceCountUtil.release(msg); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + MqttFixedHeader connectFixedHeader = + new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttConnectVariableHeader connectVariableHeader = + new MqttConnectVariableHeader(PROTOCOL_NAME_MQTT_3_1_1, PROTOCOL_VERSION_MQTT_3_1_1, true, true, false, + 0, false, false, 20); + MqttConnectPayload connectPayload = new MqttConnectPayload(clientId, null, null, userName, password); + MqttConnectMessage connectMessage = + new MqttConnectMessage(connectFixedHeader, connectVariableHeader, connectPayload); + ctx.writeAndFlush(connectMessage); + System.out.println("Sent CONNECT"); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + MqttFixedHeader pingreqFixedHeader = + new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessage pingreqMessage = new MqttMessage(pingreqFixedHeader); + ctx.writeAndFlush(pingreqMessage); + System.out.println("Sent PINGREQ"); + } else { + super.userEventTriggered(ctx, evt); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } +}