我司的一个项目需要对接无人机的 MQTT 协议的数据,实现对无人机状态的实时监控。最终的效果是,后端拉取无人机的在线列表返回给前端,对于某个在线的无人机可以获取它的实时状态,这个状态包含了无人机的经度、纬度、高度、水平速度、垂直速度、电池温度、电池电量百分比、云台俯仰角、视频流 url 等等一系列数据。系统的前端界面将这些数据融入到一个交互式的三维场景中,用户可以看到无人机的实时位置变化,并通过集成的视频流直接观看无人机的实时画面。
由于我之前没有接触过 MQTT 协议,在查阅相关资料后决定使用 Spring Integration 来集成 MQTT。Spring Integration 的设计理念是通过声明式的配置来处理消息流,这正好符合我们处理实时无人机数据的需求。
在开始写项目代码之前,我用 docker 在本地部署了MQTT 的服务端,并参考网上的资料写了一个 Spring Integration 集成 MQTT 的 demo,这个 demo 实现了最基础的消息发布和订阅功能,后续的项目代码均在此 demo 的基础上改造而来。我把这个 demo 开源在我的 Github:jexonn/spring-boot-mqtt-demo: Spring Integration 集成 MQTT 的 demo
依赖配置
<!--mqtt相关依赖 start-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.18</version>
</dependency>
<!--mqtt相关依赖 end-->
MQTT 配置类
@Slf4j
@Configuration
@IntegrationComponentScan
@Getter
@Setter
@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true", matchIfMissing = false)
public class MqttSubscriberConfig {
@Value("${mqtt.enabled:false}")
private boolean mqttEnabled;
@Value("${mqtt.max.reconnect.attempts:5}")
private int maxReconnectAttempts;
@Value("${mqtt.reconnect.interval.seconds:30}")
private int reconnectIntervalSeconds;
@Value("${mqtt.connection.timeout:10}")
private int connectionTimeout;
@Autowired
private DroneStatusService droneStatusService;
@Autowired
private EvUavAerodromeBookService evUavAerodromeBookService;
// MQTT消息通道的Bean名称
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
private static final byte[] WILL_DATA = "offline".getBytes();
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.serverURIs}")
private String hostUrl;
@Value("${mqtt.client.id}")
private String clientId;
@Value("${mqtt.topicSuffix}")
private String topicSuffix;
// 标记MQTT客户端是否已被永久禁用
private volatile boolean permanentlyDisabled = false;
// 标记关闭过程是否正在进行中
private final AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
// MQTT消息驱动通道适配器
private MqttPahoMessageDrivenChannelAdapter adapter;
// 连续失败计数器
private final AtomicInteger consecutiveFailures = new AtomicInteger(0);
/**
* MQTT连接选项配置
*/
@Bean
public MqttConnectOptions getReceiverMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
if (!username.trim().equals("")) {
options.setUserName(username);
}
options.setPassword(password.toCharArray());
options.setServerURIs(new String[]{hostUrl});
options.setConnectionTimeout(connectionTimeout);
options.setKeepAliveInterval(20);
// 禁用自动重连,由应用自行控制
options.setAutomaticReconnect(false);
options.setCleanSession(true);
options.setMaxInflight(1000);
// 配置遗嘱消息,当客户端异常断开时发送
options.setWill("status/offline", WILL_DATA, 1, true);
return options;
}
/**
* MQTT客户端工厂
*/
@Bean
public MqttPahoClientFactory receiverMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptions());
return factory;
}
/**
* MQTT消息通道
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* 创建并配置MQTT消息驱动通道适配器
* 通道适配器 Channel Adapter 是 Spring Integration 框架中的一个重要概念,用于将消息通道连接到其他系统或传输的端点
*/
@Bean
public MessageProducer inbound() {
if (!mqttEnabled) {
log.info("MQTT is disabled. Skipping MQTT connection setup.");
return null;
}
try {
// 获取所有机场的nestCode并构建订阅主题
List<EvUavAerodromeBook> allUavAerodromeBooks = evUavAerodromeBookService.getAllUavAerodromeBooks();
List<String> allUavAerodromeBooksNestCodes = evUavAerodromeBookService.extractNestCodes(allUavAerodromeBooks);
String[] topics = allUavAerodromeBooksNestCodes.stream()
.map(topic -> "/" + topic + topicSuffix)
.toArray(String[]::new);
// 创建并配置适配器
adapter = new MqttPahoMessageDrivenChannelAdapter(
// 客户端ID
clientId + "_" + System.currentTimeMillis(),
// MQTT客户端工厂
receiverMqttClientFactory(),
// 要订阅的主题
topics);
// 设置连接超时时间
adapter.setCompletionTimeout(connectionTimeout * 1000);
// 设置重连间隔时间
adapter.setRecoveryInterval(reconnectIntervalSeconds * 1000);
// 设置消息转换器
adapter.setConverter(new DefaultPahoMessageConverter());
// 设置QoS
adapter.setQos(1);
// 设置输出通道,接收到消息后会发送到这个通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
} catch (Exception e) {
log.error("Failed to create MQTT adapter", e);
return null;
}
}
/**
* MQTT消息处理器
* 用于处理接收到的MQTT消息,更新无人机状态
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return message -> {
if (!mqttEnabled || permanentlyDisabled || shutdownInProgress.get()) {
return;
}
try {
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
String msg = message.getPayload().toString();
String nestCode = extractCustomCodeFromTopic(topic);
droneStatusService.updateDroneStatus(nestCode, msg);
// 成功处理消息,重置失败计数
consecutiveFailures.set(0);
} catch (Exception e) {
log.error("Error processing MQTT message", e);
}
};
}
/**
* 从 topic 提取 customCode
* @param topic
* @return
*/
private String extractCustomCodeFromTopic(String topic) {
// 移除开头的"/"
String trimmedTopic = topic.startsWith("/") ? topic.substring(1) : topic;
// 获取第一个"/"之前的内容
int endIndex = trimmedTopic.indexOf("/");
return endIndex > 0 ? trimmedTopic.substring(0, endIndex) : trimmedTopic;
}
/**
* 处理MQTT连接失败事件
* 记录失败次数,达到最大重试次数时禁用MQTT客户端
*/
@EventListener
public void handleConnectionFailedEvent(MqttConnectionFailedEvent event) {
if (shutdownInProgress.get()) {
return;
}
int failures = consecutiveFailures.incrementAndGet();
if (failures >= maxReconnectAttempts) {
log.error("Maximum MQTT connection attempts ({}) reached. Disabling MQTT client.", maxReconnectAttempts);
permanentlyDisabled = true;
stopAdapter();
} else {
log.warn("MQTT connection attempt {} of {} failed.", failures, maxReconnectAttempts);
}
}
/**
* 停止MQTT适配器,安全地关闭MQTT连接
*/
private void stopAdapter() {
try {
if (adapter != null) {
adapter.stop();
}
} catch (Exception e) {
log.error("Error stopping MQTT adapter", e);
}
}
/**
* 清理方法,在Spring容器销毁前调用,确保MQTT连接被正确关闭
*/
@PreDestroy
public void cleanup() {
shutdownInProgress.set(true);
stopAdapter();
}
}
这个配置类做了很多事情:
- 设置 MQTT 连接选项, 包括用户认证、服务器地址、超时时间等。
- 配置遗嘱消息, 这在无人机意外断开连接时很有用。
- 创建 MQTT 客户端工厂和消息通道。
- 设置消息处理器, 用于处理接收到的 MQTT 消息。
配置文件
mqtt:
enabled: true # 设置为 true 启用 MQTT,false 禁用
max:
reconnect:
attempts: 5 # 最大重连次数
reconnect:
interval:
seconds: 30 # 重连间隔时间(秒)
connection:
timeout: 10 # 连接超时时间(秒)
serverURIs: tcp://<ip>:<port>
username: <username>
password: <password>
client:
id: ${random.value}
topicSuffix: /status/aircraft
无人机状态实体类
DroneStatusVO 用于表示单个无人机的状态, 而 AllDroneStatusVO 则用于汇总所有无人机的状态。
@Data
public class DroneStatusVO {
private boolean isOnline = false;
private JsonNode droneStatus;
}
@Data
public class AllDroneStatusVO {
Integer onlineNum;
Integer offlineNum;
Map<String, JsonNode> onlineDroneStatusList;
List<String> offlineDroneStatusList;
public AllDroneStatusVO() {
this.onlineDroneStatusList = new HashMap<>();
this.offlineDroneStatusList = new ArrayList<>();
}
}
无人机状态服务类
@Slf4j
@Service
public class DroneStatusService {
@Resource
private EvUavBookService evUavBookService;
@Autowired
private ObjectMapper objectMapper;
// 存储无人机状态数据和最后更新时间
private final Map<String, DroneStatusWrapper> droneStatusCache = new ConcurrentHashMap<>();
@Value("${drone.timeout.seconds:30}") // 默认30秒超时
private long timeoutSeconds;
// 内部类,包装DroneStatus和时间戳
private static class DroneStatusWrapper {
private final JsonNode status;
private final long lastUpdateTime;
public DroneStatusWrapper(JsonNode status) {
this.status = status;
this.lastUpdateTime = System.currentTimeMillis();
}
public boolean isExpired(long timeoutMillis) {
return System.currentTimeMillis() - lastUpdateTime > timeoutMillis;
}
}
/**
* 更新无人机状态缓存
*/
public void updateDroneStatus(String nestCode, String jsonData) {
try {
JsonNode droneStatus = objectMapper.readTree(jsonData);
droneStatusCache.put(nestCode, new DroneStatusWrapper(droneStatus));
// log.debug("Updated drone status for {}", nestCode);
} catch (Exception e) {
log.error("Failed to parse drone status data for {}: {}", nestCode, e.getMessage());
}
}
/**
* 获取指定无人机的在线状态
* 如果无人机状态已过期,返回null
*/
public DroneStatusVO getDroneStatus(String nestCode) {
DroneStatusWrapper wrapper = droneStatusCache.get(nestCode);
if (wrapper != null && !wrapper.isExpired(timeoutSeconds * 1000)) {
DroneStatusVO droneStatusVO = new DroneStatusVO();
droneStatusVO.setDroneStatus(wrapper.status);
droneStatusVO.setOnline(true);
return droneStatusVO;
} else {
// 如果状态已过期,从缓存中移除
droneStatusCache.remove(nestCode);
return new DroneStatusVO();
}
}
/**
* 定时清理过期的状态数据
* 每10秒执行一次
*/
@Scheduled(fixedRate = 10000)
public void cleanExpiredStatus() {
long timeoutMillis = timeoutSeconds * 1000;
droneStatusCache.entrySet().removeIf(entry ->
entry.getValue().isExpired(timeoutMillis));
}
/**
* 清除指定无人机的状态缓存
*/
public void clearDroneStatus(String nestCode) {
droneStatusCache.remove(nestCode);
}
/**
* 获取所有在线无人机的状态
*/
public Map<String, JsonNode> getAllOnlineDroneStatus() {
Map<String, JsonNode> onlineDrones = new ConcurrentHashMap<>();
long timeoutMillis = timeoutSeconds * 1000;
droneStatusCache.forEach((nestCode, wrapper) -> {
if (!wrapper.isExpired(timeoutMillis)) {
onlineDrones.put(nestCode, wrapper.status);
}
});
return onlineDrones;
}
/**
* 获取所有无人机的状态信息
* @return
*/
public AllDroneStatusVO getAllDroneStatus() {
AllDroneStatusVO allDroneStatus = new AllDroneStatusVO();
// 获取所有无人机的自定义编码
List<EvUavBook> allUavBooks = evUavBookService.getAllUavBooks();
List<String> allUavBooksCustomCodes = evUavBookService.extractCustomCodes(allUavBooks);
// 获取所有在线无人机的状态
Map<String, JsonNode> allOnlineDroneStatus = getAllOnlineDroneStatus();
for (String code : allUavBooksCustomCodes) {
if (allOnlineDroneStatus.containsKey(code)) {
JsonNode droneStatus = allOnlineDroneStatus.get(code);
allDroneStatus.getOnlineDroneStatusList().put(code, droneStatus);
} else {
allDroneStatus.getOfflineDroneStatusList().add(code);
}
}
allDroneStatus.setOnlineNum(allDroneStatus.getOnlineDroneStatusList().size());
allDroneStatus.setOfflineNum(allDroneStatus.getOfflineDroneStatusList().size());
return allDroneStatus;
}
}