Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

25.3. Android Mqtt v5 例子

		
package cn.netkiller.ropeway.service;

import android.app.Service;
import android.content.Intent;
import android.os.IBinder;
import android.util.Log;

import org.eclipse.paho.mqttv5.client.IMqttDeliveryToken;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

public class MyService extends Service {
    MqttAsyncClient mqttAsyncClient;
    IMqttToken token;

    String topic = "/netkiller/test";
    String content = "Helloworld!!!";
    int qos = 2;
    String broker = "tcp://broker.emqx.io:1883";
    String clientId = "JavaSample" + System.currentTimeMillis();

    public MyService() {
        try {
            MemoryPersistence persistence = new MemoryPersistence();
            mqttAsyncClient = new MqttAsyncClient(broker, clientId, persistence);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
        }
    }

    @Override
    public IBinder onBind(Intent intent) {
        // TODO: Return the communication channel to the service.
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override
    public void onCreate() {
        super.onCreate();
        Log.d("Service", "onCreate() executed");
        try {
            MqttConnectionOptions mqttConnectionOptions = new MqttConnectionOptions();
            mqttConnectionOptions.setCleanStart(false);
            mqttConnectionOptions.setAutomaticReconnect(true);

            Log.d("Service", "Connecting to broker: " + broker);

            token = mqttAsyncClient.connect(mqttConnectionOptions);
            token.waitForCompletion();
            if (token.isComplete()) {
                Log.d("Service", "Connected");
                mqttAsyncClient.subscribe("/netkiller/message", qos);
            }

        } catch (MqttException e) {
            throw new RuntimeException(e);
        }

        mqttAsyncClient.setCallback(new MqttCallback() {

            @Override
            public void disconnected(MqttDisconnectResponse disconnectResponse) {

            }

            @Override
            public void mqttErrorOccurred(MqttException exception) {

            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                String msg = new String(message.getPayload());
                Log.d("Service", String.format("接收消息 Id:%s, Topic: %s, QoS: %s, Message: %s, ", message.getId(), topic, message.getQos(), message.toString()));
            }

            @Override
            public void deliveryComplete(IMqttToken token) {

            }

            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
//                if (reconnect) {
//                    try {
//                        mqttAsyncClient.subscribe("/netkiller/message", qos);
//                    } catch (MqttException e) {
//                        throw new RuntimeException(e);
//                    }
//                }
            }

            @Override
            public void authPacketArrived(int reasonCode, MqttProperties properties) {

            }


            public void deliveryComplete(IMqttDeliveryToken arg0) {
                try {
                    System.out.println(arg0.getMessage());
                } catch (MqttException e1) {
                    e1.printStackTrace();
                }

            }


            public void connectionLost(Throwable err) {
                System.out.println("连接丢失");
                System.out.println(err.getMessage());

            }
        });


    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        Log.d("Service", "onStartCommand() executed");

        try {
            Log.d("Service", "Publishing message: " + content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            token = mqttAsyncClient.publish(topic, message);
            token.waitForCompletion();
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }

        return super.onStartCommand(intent, flags, startId);
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        try {
            if (mqttAsyncClient.isConnected()) {
                mqttAsyncClient.close();
                Log.d("Service", "Close client.");
            }
        } catch (MqttException e) {
            Log.d("Service", "Disconnected");
        }
        Log.d("Service", "onDestroy() executed");
    }
}