MQTT( Message Queuing Telemetry Transport)是一个物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。在实际的开发中,我们通常会用到Spring,这里简单描述一下在SpringBoot中如何集成MQTT。
在Spring的一系列文档中,已经有了对应的集成代码。见:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
@Bean publicMqttPahoClientFactorymqttClientFactory(){ List<String> urls = mqttUrls().getUrls(); DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs("tcp://localhost:1883"); return factory; }
@Bean publicIntegrationFlowmqttInFlow(){ return IntegrationFlows.from(mqttInbound()) .transform(p -> p) .handle(mqttService.handler()) .get(); } privateMessageProducerSupportmqttInbound(){ MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("customer", mqttClientFactory(), "test-topic"); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); return adapter; }
@Bean publicIntegrationFlowmqttOutFlow(){ return IntegrationFlows.from(outChannel()) .handle(mqttOutbound()) .get(); } @Bean publicMessageChanneloutChannel(){ return new DirectChannel(); } @Bean publicMessageHandlermqttOutbound(){ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publisher", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("test-topic"); return messageHandler; }
@MessagingGateway(defaultRequestChannel = "outChannel") public interface MessageWriter{ void write(String data); }
生产者的使用可以为:
@Autowired MessageWriter messageWriter void publish(String data){ messageWriter.write(data) }