feat(example-mqtt): new MQTT heartBeat broker and client examples (#9336)
Motivation: Recently I'm going to build MQTT broker and client based on Netty. I had MQTT encoder and decoder founded, while no basic examples. So I'm going to share my simple heartBeat MQTT broker and client as an example. Modification: New MQTT heartBeat example under io.netty.example/mqtt/heartBeat/. Result: Client would send CONNECT and PINGREQ(heartBeat message). - CONNECT: once channel active - PINGREQ: once IdleStateEvent triggered, which is 20 seconds in this example Client would discard all messages it received. MQTT broker could handle CONNECT, PINGREQ and DISCONNECT messages. - CONNECT: send CONNACK back - PINGREQ: send PINGRESP back - DISCONNECT: close the channel Broker would close the channel if 2 heartBeat lost, which set to 45 seconds in this example.
This commit is contained in:
parent
794bb6c7b6
commit
40228411d7
@ -98,6 +98,11 @@
|
||||
<artifactId>netty-codec-stomp</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>netty-codec-mqtt</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
|
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.example.mqtt.heartBeat;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.handler.codec.mqtt.MqttDecoder;
|
||||
import io.netty.handler.codec.mqtt.MqttEncoder;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public final class MqttHeartBeatBroker {
|
||||
|
||||
private MqttHeartBeatBroker() {
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
||||
EventLoopGroup workerGroup = new NioEventLoopGroup();
|
||||
|
||||
try {
|
||||
ServerBootstrap b = new ServerBootstrap();
|
||||
b.group(bossGroup, workerGroup);
|
||||
b.option(ChannelOption.SO_BACKLOG, 1024);
|
||||
b.channel(NioServerSocketChannel.class);
|
||||
b.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE);
|
||||
ch.pipeline().addLast("decoder", new MqttDecoder());
|
||||
ch.pipeline().addLast("heartBeatHandler", new IdleStateHandler(45, 0, 0, TimeUnit.SECONDS));
|
||||
ch.pipeline().addLast("handler", MqttHeartBeatBrokerHandler.INSTANCE);
|
||||
}
|
||||
});
|
||||
|
||||
ChannelFuture f = b.bind(1883).sync();
|
||||
System.out.println("Broker initiated...");
|
||||
|
||||
f.channel().closeFuture().sync();
|
||||
} finally {
|
||||
workerGroup.shutdownGracefully();
|
||||
bossGroup.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.example.mqtt.heartBeat;
|
||||
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
|
||||
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
|
||||
import io.netty.handler.codec.mqtt.MqttFixedHeader;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttMessageType;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.netty.handler.timeout.IdleState;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
|
||||
@Sharable
|
||||
public final class MqttHeartBeatBrokerHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
public static final MqttHeartBeatBrokerHandler INSTANCE = new MqttHeartBeatBrokerHandler();
|
||||
|
||||
private MqttHeartBeatBrokerHandler() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
MqttMessage mqttMessage = (MqttMessage) msg;
|
||||
System.out.println("Received MQTT message: " + mqttMessage);
|
||||
switch (mqttMessage.fixedHeader().messageType()) {
|
||||
case CONNECT:
|
||||
MqttFixedHeader connackFixedHeader =
|
||||
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
MqttConnAckVariableHeader mqttConnAckVariableHeader =
|
||||
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false);
|
||||
MqttConnAckMessage connack = new MqttConnAckMessage(connackFixedHeader, mqttConnAckVariableHeader);
|
||||
ctx.writeAndFlush(connack);
|
||||
break;
|
||||
case PINGREQ:
|
||||
MqttFixedHeader pingreqFixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false,
|
||||
MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
MqttMessage pingResp = new MqttMessage(pingreqFixedHeader);
|
||||
ctx.writeAndFlush(pingResp);
|
||||
break;
|
||||
case DISCONNECT:
|
||||
ctx.close();
|
||||
break;
|
||||
default:
|
||||
System.out.println("Unexpected message type: " + mqttMessage.fixedHeader().messageType());
|
||||
ReferenceCountUtil.release(msg);
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
System.out.println("Channel heartBeat lost");
|
||||
if (evt instanceof IdleStateEvent && IdleState.READER_IDLE == ((IdleStateEvent) evt).state()) {
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
cause.printStackTrace();
|
||||
ctx.close();
|
||||
}
|
||||
}
|
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.example.mqtt.heartBeat;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.codec.mqtt.MqttDecoder;
|
||||
import io.netty.handler.codec.mqtt.MqttEncoder;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public final class MqttHeartBeatClient {
|
||||
private MqttHeartBeatClient() {
|
||||
}
|
||||
|
||||
private static final String HOST = System.getProperty("host", "127.0.0.1");
|
||||
private static final int PORT = Integer.parseInt(System.getProperty("port", "1883"));
|
||||
private static final String CLIENT_ID = System.getProperty("clientId", "guestClient");
|
||||
private static final String USER_NAME = System.getProperty("userName", "guest");
|
||||
private static final String PASSWORD = System.getProperty("password", "guest");
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
EventLoopGroup workerGroup = new NioEventLoopGroup();
|
||||
|
||||
try {
|
||||
Bootstrap b = new Bootstrap();
|
||||
b.group(workerGroup);
|
||||
b.channel(NioSocketChannel.class);
|
||||
b.handler(new ChannelInitializer<SocketChannel>() {
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE);
|
||||
ch.pipeline().addLast("decoder", new MqttDecoder());
|
||||
ch.pipeline().addLast("heartBeatHandler", new IdleStateHandler(0, 20, 0, TimeUnit.SECONDS));
|
||||
ch.pipeline().addLast("handler", new MqttHeartBeatClientHandler(CLIENT_ID, USER_NAME, PASSWORD));
|
||||
}
|
||||
});
|
||||
|
||||
ChannelFuture f = b.connect(HOST, PORT).sync();
|
||||
System.out.println("Client connected");
|
||||
f.channel().closeFuture().sync();
|
||||
} finally {
|
||||
workerGroup.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.example.mqtt.heartBeat;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.handler.codec.mqtt.MqttConnectMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttConnectPayload;
|
||||
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
|
||||
import io.netty.handler.codec.mqtt.MqttFixedHeader;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttMessageType;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
|
||||
public class MqttHeartBeatClientHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
private static final String PROTOCOL_NAME_MQTT_3_1_1 = "MQTT";
|
||||
private static final int PROTOCOL_VERSION_MQTT_3_1_1 = 4;
|
||||
|
||||
private final String clientId;
|
||||
private final String userName;
|
||||
private final byte[] password;
|
||||
|
||||
public MqttHeartBeatClientHandler(String clientId, String userName, String password) {
|
||||
this.clientId = clientId;
|
||||
this.userName = userName;
|
||||
this.password = password.getBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
// discard all messages
|
||||
ReferenceCountUtil.release(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
MqttFixedHeader connectFixedHeader =
|
||||
new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
MqttConnectVariableHeader connectVariableHeader =
|
||||
new MqttConnectVariableHeader(PROTOCOL_NAME_MQTT_3_1_1, PROTOCOL_VERSION_MQTT_3_1_1, true, true, false,
|
||||
0, false, false, 20);
|
||||
MqttConnectPayload connectPayload = new MqttConnectPayload(clientId, null, null, userName, password);
|
||||
MqttConnectMessage connectMessage =
|
||||
new MqttConnectMessage(connectFixedHeader, connectVariableHeader, connectPayload);
|
||||
ctx.writeAndFlush(connectMessage);
|
||||
System.out.println("Sent CONNECT");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
if (evt instanceof IdleStateEvent) {
|
||||
MqttFixedHeader pingreqFixedHeader =
|
||||
new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
MqttMessage pingreqMessage = new MqttMessage(pingreqFixedHeader);
|
||||
ctx.writeAndFlush(pingreqMessage);
|
||||
System.out.println("Sent PINGREQ");
|
||||
} else {
|
||||
super.userEventTriggered(ctx, evt);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
cause.printStackTrace();
|
||||
ctx.close();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user