知乎专栏 |
https://docs.spring.io/spring-integration/reference/mqtt.html
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>6.2.1</version> </dependency>
package cn.netkiller.config; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.handler.annotation.Header; @Configuration @EnableIntegration @IntegrationComponentScan public class MqttConfiguration { @Value("${mqtt.broker}") private String broker; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; private final int qos = 2; @Value("${mqtt.topic.prefix}") private String prefix; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{broker}); options.setUserName(username); options.setPassword(password.toCharArray()); options.setAutomaticReconnect(true); options.setCleanSession(false); options.setKeepAliveInterval(20); // options.setConnectionTimeout(30000); // options.setExecutorServiceTimeout(30000); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("testClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("testTopic"); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway { void sendToMqtt(String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data); } }
@Autowired private MqttConfiguration.MyGateway myGateway; @GetMapping("/mqtt") public Mono<String> mqtt(Principal principal) { myGateway.sendToMqtt("Test"); myGateway.sendToMqtt("neo", "Test"); return Mono.empty(); }
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway { void sendToMqtt(String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); }
<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.mqttv5.client --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.mqttv5.client</artifactId> <version>1.2.5</version> </dependency>
代码
package cn.netkiller.component; import com.google.gson.Gson; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.LinkedHashMap; import java.util.Map; @Component @Slf4j @Data public class MqttComponent { private final String clientId = "netkiller-" + System.currentTimeMillis(); private final Gson gson = new Gson(); private final int qos = 2; @Value("${mqtt.topic.prefix}") private String prefix; private String broker; private String username; private String password; public MqttComponent() { } public MqttComponent(@Value("${mqtt.broker}") String broker, @Value("${mqtt.username}") String username, @Value("${mqtt.password}") String password) { this.broker = broker; this.username = username; this.password = password; } public void publish(String topic, String device, String session, String content) { // Thread.currentThread().setName(this.getClass().getSimpleName()); MqttConnectionOptions options = new MqttConnectionOptions(); options.setCleanStart(false); // options.setAutomaticReconnect(true); options.setConnectionTimeout(30); options.setKeepAliveInterval(20); MemoryPersistence persistence = new MemoryPersistence(); try { if (username != null) { options.setUserName(username); options.setPassword(password.getBytes()); } MqttAsyncClient client = new MqttAsyncClient(broker, clientId, persistence); IMqttToken token = client.connect(options); token.waitForCompletion(20000L); if (token.isComplete()) { log.debug("Connecting to broker: {} username: {} password: {} ", broker, username, password); // log.debug("Auth username: {} password: {} Connected!", username, password); topic = prefix.concat("/".concat(device).concat("/").concat(topic)); String jsonString = gson.toJson(Map.of("session", session, "data", content), LinkedHashMap.class); byte[] payload = jsonString.getBytes(); // if (client == null || !client.isConnected()) { // this.connect(); // } try { if (client.isConnected()) { MqttMessage message = new MqttMessage(payload); message.setQos(qos); token = client.publish(topic, message); token.waitForCompletion(30000L); if (token.isComplete()) { // log.info(String.format("Published topic: %s, message: %s", topic, message)); log.info(String.format("Publishing topic: %s, message: %s", topic, message)); } } } finally { client.close(); } } } catch (MqttException e) { log.debug("Mqtt reason: " + e.getReasonCode() + ", cause: " + e.getCause() + ", msg: " + e.getMessage()); } } public String topic(String prefix, String device, String service) { if (prefix == null) { prefix = this.prefix; } return String.format("%s/%s/%s", prefix, device, service); } public String message(int sequence, String session, String segment, String audio, String state) { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); LocalDateTime dateTime = LocalDateTime.now(); String formattedDateTime = dateTime.format(formatter); String jsonString = gson.toJson(Map.of("sequence", sequence, "session", session, "segment", segment, "audio", audio, "time", formattedDateTime, "state", state), LinkedHashMap.class); return jsonString; } public String message(String session, String content) { // DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // LocalDateTime dateTime = LocalDateTime.now(); // String formattedDateTime = dateTime.format(formatter); String jsonString = gson.toJson(Map.of("session", session, "data", content), LinkedHashMap.class); return jsonString; } public void publish(String topic, String message) { // Thread.currentThread().setName(this.getClass().getSimpleName()); MqttConnectionOptions options = new MqttConnectionOptions(); options.setCleanStart(false); // options.setAutomaticReconnect(true); options.setConnectionTimeout(30); options.setKeepAliveInterval(20); MemoryPersistence persistence = new MemoryPersistence(); try { if (username != null) { options.setUserName(username); options.setPassword(password.getBytes()); } MqttAsyncClient client = new MqttAsyncClient(broker, clientId, persistence); IMqttToken token = client.connect(options); token.waitForCompletion(20000L); if (token.isComplete()) { log.debug("Connecting to broker: {} username: {} password: {} ", broker, username, password); byte[] payload = message.getBytes(); try { if (client.isConnected()) { MqttMessage mqttMessage = new MqttMessage(payload); mqttMessage.setQos(qos); token = client.publish(topic, mqttMessage); token.waitForCompletion(30000L); if (token.isComplete()) { log.info(String.format("Publishing topic: %s, message: %s", topic, mqttMessage)); } } } finally { client.close(); } } } catch (MqttException e) { log.debug("Mqtt reason: " + e.getReasonCode() + ", cause: " + e.getCause() + ", msg: " + e.getMessage()); } } public void publish(String topic, String session, String content) { this.publish(topic, this.message(session, content)); } }