Spring Integration 集成 MQTT 实现无人机状态实时监控

在我司的一个项目中,需要实现对无人机状态的实时监控。我使用Spring Integration框架来集成MQTT协议,实现了无人机数据的实时接收和处理。系统通过MQTT订阅无人机的状态信息(包括位置、速度、电量等),将数据存储在内存中,并通过定时任务来清理过期数据。最终实现了在前端三维场景中展示无人机的实时位置变化,并能查看无人机的实时视频流。这篇文章详细记录了从MQTT配置到具体实现的完整过程。

我司的一个项目需要对接无人机的 MQTT 协议的数据,实现对无人机状态的实时监控。最终的效果是,后端拉取无人机的在线列表返回给前端,对于某个在线的无人机可以获取它的实时状态,这个状态包含了无人机的经度、纬度、高度、水平速度、垂直速度、电池温度、电池电量百分比、云台俯仰角、视频流 url 等等一系列数据。系统的前端界面将这些数据融入到一个交互式的三维场景中,用户可以看到无人机的实时位置变化,并通过集成的视频流直接观看无人机的实时画面。

由于我之前没有接触过 MQTT 协议,在查阅相关资料后决定使用 Spring Integration 来集成 MQTT。Spring Integration 的设计理念是通过声明式的配置来处理消息流,这正好符合我们处理实时无人机数据的需求。

在开始写项目代码之前,我用 docker 在本地部署了MQTT 的服务端,并参考网上的资料写了一个 Spring Integration 集成 MQTT 的 demo,这个 demo 实现了最基础的消息发布和订阅功能,后续的项目代码均在此 demo 的基础上改造而来。我把这个 demo 开源在我的 Github:jexonn/spring-boot-mqtt-demo: Spring Integration 集成 MQTT 的 demo

依赖配置

<!--mqtt相关依赖 start-->
<dependency>  
    <groupId>org.springframework.integration</groupId>  
    <artifactId>spring-integration-stream</artifactId>  
</dependency>  
<dependency>  
    <groupId>org.springframework.integration</groupId>  
    <artifactId>spring-integration-mqtt</artifactId>  
    <version>5.5.18</version>
</dependency>  
<!--mqtt相关依赖 end-->

MQTT 配置类

@Slf4j  
@Configuration  
@IntegrationComponentScan  
@Getter  
@Setter  
@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true", matchIfMissing = false)  
public class MqttSubscriberConfig {  
  
    @Value("${mqtt.enabled:false}")  
    private boolean mqttEnabled;  
  
    @Value("${mqtt.max.reconnect.attempts:5}")  
    private int maxReconnectAttempts;  
  
    @Value("${mqtt.reconnect.interval.seconds:30}")  
    private int reconnectIntervalSeconds;  
  
    @Value("${mqtt.connection.timeout:10}")  
    private int connectionTimeout;  
  
    @Autowired  
    private DroneStatusService droneStatusService;  
  
    @Autowired  
    private EvUavAerodromeBookService evUavAerodromeBookService;  
  
    // MQTT消息通道的Bean名称  
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";  
  
    // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息  
    private static final byte[] WILL_DATA = "offline".getBytes();  
  
    @Value("${mqtt.username}")  
    private String username;  
  
    @Value("${mqtt.password}")  
    private String password;  
  
    @Value("${mqtt.serverURIs}")  
    private String hostUrl;  
  
    @Value("${mqtt.client.id}")  
    private String clientId;  
  
    @Value("${mqtt.topicSuffix}")  
    private String topicSuffix;  
  
    // 标记MQTT客户端是否已被永久禁用  
    private volatile boolean permanentlyDisabled = false;  
    // 标记关闭过程是否正在进行中  
    private final AtomicBoolean shutdownInProgress = new AtomicBoolean(false);  
    // MQTT消息驱动通道适配器  
    private MqttPahoMessageDrivenChannelAdapter adapter;  
    // 连续失败计数器  
    private final AtomicInteger consecutiveFailures = new AtomicInteger(0);  
  
    /**  
     * MQTT连接选项配置  
     */  
    @Bean  
    public MqttConnectOptions getReceiverMqttConnectOptions() {  
        MqttConnectOptions options = new MqttConnectOptions();  
        if (!username.trim().equals("")) {  
            options.setUserName(username);  
        }  
        options.setPassword(password.toCharArray());  
        options.setServerURIs(new String[]{hostUrl});  
        options.setConnectionTimeout(connectionTimeout);  
        options.setKeepAliveInterval(20);  
        // 禁用自动重连,由应用自行控制  
        options.setAutomaticReconnect(false);  
        options.setCleanSession(true);  
        options.setMaxInflight(1000);  
  
        // 配置遗嘱消息,当客户端异常断开时发送  
        options.setWill("status/offline", WILL_DATA, 1, true);  
        return options;  
    }  
  
    /**  
     * MQTT客户端工厂  
     */  
    @Bean  
    public MqttPahoClientFactory receiverMqttClientFactory() {  
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();  
        factory.setConnectionOptions(getReceiverMqttConnectOptions());  
        return factory;  
    }  
  
    /**  
     * MQTT消息通道  
     */  
    @Bean(name = CHANNEL_NAME_IN)  
    public MessageChannel mqttInboundChannel() {  
        return new DirectChannel();  
    }  
  
    /**  
     * 创建并配置MQTT消息驱动通道适配器  
     * 通道适配器 Channel Adapter 是 Spring Integration 框架中的一个重要概念,用于将消息通道连接到其他系统或传输的端点  
     */  
    @Bean  
    public MessageProducer inbound() {  
        if (!mqttEnabled) {  
            log.info("MQTT is disabled. Skipping MQTT connection setup.");  
            return null;  
        }  
  
        try {  
            // 获取所有机场的nestCode并构建订阅主题  
            List<EvUavAerodromeBook> allUavAerodromeBooks = evUavAerodromeBookService.getAllUavAerodromeBooks();  
            List<String> allUavAerodromeBooksNestCodes = evUavAerodromeBookService.extractNestCodes(allUavAerodromeBooks);  
            String[] topics = allUavAerodromeBooksNestCodes.stream()  
                    .map(topic -> "/" + topic + topicSuffix)  
                    .toArray(String[]::new);  
  
            // 创建并配置适配器  
            adapter = new MqttPahoMessageDrivenChannelAdapter(  
                    // 客户端ID  
                    clientId + "_" + System.currentTimeMillis(),  
                    // MQTT客户端工厂  
                    receiverMqttClientFactory(),  
                    // 要订阅的主题  
                    topics);  
  
            // 设置连接超时时间  
            adapter.setCompletionTimeout(connectionTimeout * 1000);  
            // 设置重连间隔时间  
            adapter.setRecoveryInterval(reconnectIntervalSeconds * 1000);  
            // 设置消息转换器  
            adapter.setConverter(new DefaultPahoMessageConverter());  
            // 设置QoS  
            adapter.setQos(1);  
            // 设置输出通道,接收到消息后会发送到这个通道  
            adapter.setOutputChannel(mqttInboundChannel());  
  
            return adapter;  
        } catch (Exception e) {  
            log.error("Failed to create MQTT adapter", e);  
            return null;  
        }  
    }  
  
    /**  
     * MQTT消息处理器  
     * 用于处理接收到的MQTT消息,更新无人机状态  
     */  
    @Bean  
    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)  
    public MessageHandler handler() {  
        return message -> {  
            if (!mqttEnabled || permanentlyDisabled || shutdownInProgress.get()) {  
                return;  
            }  
            try {  
                String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();  
                String msg = message.getPayload().toString();  
                String nestCode = extractCustomCodeFromTopic(topic);  
                droneStatusService.updateDroneStatus(nestCode, msg);  
                // 成功处理消息,重置失败计数  
                consecutiveFailures.set(0);  
            } catch (Exception e) {  
                log.error("Error processing MQTT message", e);  
            }  
        };  
    }  
  
    /**  
     * 从 topic 提取 customCode  
     * @param topic  
     * @return  
     */  
    private String extractCustomCodeFromTopic(String topic) {  
        // 移除开头的"/"  
        String trimmedTopic = topic.startsWith("/") ? topic.substring(1) : topic;  
        // 获取第一个"/"之前的内容  
        int endIndex = trimmedTopic.indexOf("/");  
        return endIndex > 0 ? trimmedTopic.substring(0, endIndex) : trimmedTopic;  
    }  
  
    /**  
     * 处理MQTT连接失败事件  
     * 记录失败次数,达到最大重试次数时禁用MQTT客户端  
     */  
    @EventListener  
    public void handleConnectionFailedEvent(MqttConnectionFailedEvent event) {  
        if (shutdownInProgress.get()) {  
            return;  
        }  
  
        int failures = consecutiveFailures.incrementAndGet();  
        if (failures >= maxReconnectAttempts) {  
            log.error("Maximum MQTT connection attempts ({}) reached. Disabling MQTT client.", maxReconnectAttempts);  
            permanentlyDisabled = true;  
            stopAdapter();  
        } else {  
            log.warn("MQTT connection attempt {} of {} failed.", failures, maxReconnectAttempts);  
        }  
    }  
  
  
    /**  
     * 停止MQTT适配器,安全地关闭MQTT连接  
     */  
    private void stopAdapter() {  
        try {  
            if (adapter != null) {  
                adapter.stop();  
            }  
        } catch (Exception e) {  
            log.error("Error stopping MQTT adapter", e);  
        }  
    }  
  
    /**  
     * 清理方法,在Spring容器销毁前调用,确保MQTT连接被正确关闭  
     */  
    @PreDestroy  
    public void cleanup() {  
        shutdownInProgress.set(true);  
        stopAdapter();  
    }  
}

这个配置类做了很多事情:

  • 设置 MQTT 连接选项, 包括用户认证、服务器地址、超时时间等。
  • 配置遗嘱消息, 这在无人机意外断开连接时很有用。
  • 创建 MQTT 客户端工厂和消息通道。
  • 设置消息处理器, 用于处理接收到的 MQTT 消息。

配置文件

mqtt:
  enabled: true  # 设置为 true 启用 MQTT,false 禁用  
  max:
    reconnect:
      attempts: 5  # 最大重连次数
  reconnect:
    interval:
      seconds: 30  # 重连间隔时间(秒)  
  connection:
    timeout: 10    # 连接超时时间(秒)  
  serverURIs: tcp://<ip>:<port>
  username: <username>
  password: <password>
  client:
    id: ${random.value}
  topicSuffix: /status/aircraft

无人机状态实体类

DroneStatusVO 用于表示单个无人机的状态, 而 AllDroneStatusVO 则用于汇总所有无人机的状态。

@Data  
public class DroneStatusVO {  
  
    private boolean isOnline = false;  
  
    private JsonNode droneStatus;  
}

@Data  
public class AllDroneStatusVO {  
  
    Integer onlineNum;  
  
    Integer offlineNum;  
  
    Map<String, JsonNode> onlineDroneStatusList;  
  
    List<String> offlineDroneStatusList;  
  
    public AllDroneStatusVO() {  
        this.onlineDroneStatusList = new HashMap<>();  
        this.offlineDroneStatusList = new ArrayList<>();  
    }  
}

无人机状态服务类

@Slf4j  
@Service  
public class DroneStatusService {  
  
    @Resource  
    private EvUavBookService evUavBookService;  
  
    @Autowired  
    private ObjectMapper objectMapper;  
      
    // 存储无人机状态数据和最后更新时间  
    private final Map<String, DroneStatusWrapper> droneStatusCache = new ConcurrentHashMap<>();  
  
    @Value("${drone.timeout.seconds:30}")  // 默认30秒超时  
    private long timeoutSeconds;  
  
    // 内部类,包装DroneStatus和时间戳  
    private static class DroneStatusWrapper {  
        private final JsonNode status;  
        private final long lastUpdateTime;  
  
        public DroneStatusWrapper(JsonNode status) {  
            this.status = status;  
            this.lastUpdateTime = System.currentTimeMillis();  
        }  
  
        public boolean isExpired(long timeoutMillis) {  
            return System.currentTimeMillis() - lastUpdateTime > timeoutMillis;  
        }  
    }  
  
    /**  
     * 更新无人机状态缓存  
     */  
    public void updateDroneStatus(String nestCode, String jsonData) {  
        try {  
            JsonNode droneStatus = objectMapper.readTree(jsonData);  
            droneStatusCache.put(nestCode, new DroneStatusWrapper(droneStatus));  
//            log.debug("Updated drone status for {}", nestCode);  
        } catch (Exception e) {  
            log.error("Failed to parse drone status data for {}: {}", nestCode, e.getMessage());  
        }  
    }  
  
    /**  
     * 获取指定无人机的在线状态  
     * 如果无人机状态已过期,返回null  
     */    
     public DroneStatusVO getDroneStatus(String nestCode) {  
        DroneStatusWrapper wrapper = droneStatusCache.get(nestCode);  
        if (wrapper != null && !wrapper.isExpired(timeoutSeconds * 1000)) {  
            DroneStatusVO droneStatusVO = new DroneStatusVO();  
            droneStatusVO.setDroneStatus(wrapper.status);  
            droneStatusVO.setOnline(true);  
            return droneStatusVO;  
        } else {  
            // 如果状态已过期,从缓存中移除  
            droneStatusCache.remove(nestCode);  
            return new DroneStatusVO();  
        }  
    }  
  
    /**  
     * 定时清理过期的状态数据  
     * 每10秒执行一次  
     */  
    @Scheduled(fixedRate = 10000)  
    public void cleanExpiredStatus() {  
        long timeoutMillis = timeoutSeconds * 1000;  
        droneStatusCache.entrySet().removeIf(entry ->  
                entry.getValue().isExpired(timeoutMillis));  
    }  
  
    /**  
     * 清除指定无人机的状态缓存  
     */  
    public void clearDroneStatus(String nestCode) {  
        droneStatusCache.remove(nestCode);  
    }  
  
    /**  
     * 获取所有在线无人机的状态  
     */  
    public Map<String, JsonNode> getAllOnlineDroneStatus() {  
        Map<String, JsonNode> onlineDrones = new ConcurrentHashMap<>();  
        long timeoutMillis = timeoutSeconds * 1000;  
  
        droneStatusCache.forEach((nestCode, wrapper) -> {  
            if (!wrapper.isExpired(timeoutMillis)) {  
                onlineDrones.put(nestCode, wrapper.status);  
            }  
        });  
  
        return onlineDrones;  
    }  
  
    /**  
     * 获取所有无人机的状态信息  
     * @return  
     */  
    public AllDroneStatusVO getAllDroneStatus() {  
        AllDroneStatusVO allDroneStatus = new AllDroneStatusVO();  
  
        // 获取所有无人机的自定义编码  
        List<EvUavBook> allUavBooks = evUavBookService.getAllUavBooks();  
        List<String> allUavBooksCustomCodes = evUavBookService.extractCustomCodes(allUavBooks);  
  
        // 获取所有在线无人机的状态  
        Map<String, JsonNode> allOnlineDroneStatus = getAllOnlineDroneStatus();  
  
        for (String code : allUavBooksCustomCodes) {  
            if (allOnlineDroneStatus.containsKey(code)) {  
                JsonNode droneStatus = allOnlineDroneStatus.get(code);  
                allDroneStatus.getOnlineDroneStatusList().put(code, droneStatus);  
            } else {  
                allDroneStatus.getOfflineDroneStatusList().add(code);  
            }  
        }  
  
        allDroneStatus.setOnlineNum(allDroneStatus.getOnlineDroneStatusList().size());  
        allDroneStatus.setOfflineNum(allDroneStatus.getOfflineDroneStatusList().size());  
  
        return allDroneStatus;  
    }  
}
LICENSED UNDER CC BY-NC-SA 4.0
Comment