android高级|android集成MQTT(最新)

1 集成MQTT android高级|android集成MQTT(最新)
文章图片

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

2 添加清单
android高级|android集成MQTT(最新)
文章图片

:name="org.eclipse.paho.android.service.MqttService" /> :name=".ui.base.MyMqttService" />

3 MyMqttService类
package com.sbas.bigdata.ui.base; import android.app.Service; import android.content.Context; import android.content.Intent; import android.net.ConnectivityManager; import android.net.NetworkInfo; import android.os.Build; import android.os.Handler; import android.os.IBinder; import android.util.Log; import android.widget.Toast; import androidx.annotation.Nullable; import androidx.annotation.RequiresApi; import org.eclipse.paho.android.service.MqttAndroidClient; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; /** * Created by siboasi on 2021/7/17. */public class MyMqttService extends Service {@Nullable @org.jetbrains.annotations.Nullable @Override public IBinder onBind(Intent intent) { return null; } public final String TAG = "tag"; private static MqttAndroidClient mqttAndroidClient; private MqttConnectOptions mMqttConnectOptions; publicString HOST= "tcp://39.108.103.224:1885"; //服务器地址(协议+地址+端口号) publicString USERNAME= "admin"; //用户名 publicString PASSWORD= "admin"; //密码 public static String PUBLISH_TOPIC= "/SS/BSKT/K2100AI/0012346/RX"; //发布主题 public static String RESPONSE_TOPIC = "/SS/BSKT/K2100AI/0012346/TX"; //响应主题 publicStringuserid = "QQ448656823"; //用户ID,唯一标识符@RequiresApi(api = Build.VERSION_CODES.O) @Override public int onStartCommand(Intent intent, int flags, int startId) { init(); return super.onStartCommand(intent, flags, startId); } /** * 开启服务 */ public static void startService(Context mContext) { mContext.startService(new Intent(mContext, MyMqttService.class)); }/** * 发布 (模拟其他客户端发布消息) * * @param message 消息 */ public static void publish(String message) { String topic = PUBLISH_TOPIC; Integer qos = 2; Boolean retained = false; try { //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息 mqttAndroidClient.publish(topic, message.trim().getBytes(), qos.intValue(), retained.booleanValue()); Log.i("tag",PUBLISH_TOPIC +":"+message); } catch (MqttException e) { e.printStackTrace(); } }/** * 响应 (收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等) * * @param message 消息 */ public void response(String message) { String topic = RESPONSE_TOPIC; Integer qos = 2; Boolean retained = false; try { //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息 mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue()); } catch (MqttException e) { e.printStackTrace(); } }/** * 初始化 */ @RequiresApi(api = Build.VERSION_CODES.O) private void init() { String serverURI = HOST; //服务器地址(协议+地址+端口号) mqttAndroidClient = new MqttAndroidClient(this, serverURI, userid); mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调 mMqttConnectOptions = new MqttConnectOptions(); mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存 mMqttConnectOptions.setConnectionTimeout(60); //设置超时时间,单位:秒 mMqttConnectOptions.setKeepAliveInterval(60); //设置心跳包发送间隔,单位:秒 mMqttConnectOptions.setUserName(USERNAME); //设置用户名 mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //设置密码 // last will message boolean doConnect = true; String disconnect = "{\"disconnect\":\"" + userid + "\"}"; String topic = PUBLISH_TOPIC; Integer qos = 2; //最好不要为0可以是1 2 Boolean retained = false; if ((!disconnect.equals(""))) { // 断开上一次的连接。没有的话可以去掉 try { mMqttConnectOptions.setWill(topic, disconnect.getBytes(), qos.intValue(), retained.booleanValue()); } catch (Exception e) { Log.i(TAG, "Exception Occured", e); doConnect = false; iMqttActionListener.onFailure(null, e); } } if (doConnect) { doClientConnection(); } }/** * 连接MQTT服务器 */ private void doClientConnection() { if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) { try { mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener); } catch (MqttException e) { e.printStackTrace(); } } }/** * 判断网络是否连接 */ private boolean isConnectIsNomarl() { ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE); NetworkInfo info = connectivityManager.getActiveNetworkInfo(); if (info != null && info.isAvailable()) { String name = info.getTypeName(); Log.i(TAG, "当前网络名称:" + name); return true; } else { Log.i(TAG, "没有可用网络"); /*没有可用网络的时候,延迟3秒再尝试重连*/ new Handler().postDelayed(new Runnable() { @Override public void run() { doClientConnection(); } }, 3000); return false; } }//MQTT是否连接成功的监听 private IMqttActionListener iMqttActionListener = new IMqttActionListener() { @Override public void onSuccess(IMqttToken arg0) { Log.i(TAG, "-----------连接成功 "); try { mqttAndroidClient.subscribe(RESPONSE_TOPIC, 1); //订阅主题,参数:主题、服务质量qos最好不要为0 Log.i(TAG, "--------------订阅成功 "); } catch (MqttException e) { e.printStackTrace(); } } @Override public void onFailure(IMqttToken arg0, Throwable arg1) { arg1.printStackTrace(); Log.i(TAG, "连接失败 "); doClientConnection(); //连接失败,重连(可关闭服务器进行模拟) } }; //订阅主题的回调 private MqttCallback mqttCallback = new MqttCallback() {@Override public void messageArrived(String topic, MqttMessage message) throws Exception { Log.i(TAG, "收到消息: "+ new String(message.getPayload()) ); //收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等看你的需求,要不要回应。 //response("11111 arrived"); }@Override public void deliveryComplete(IMqttDeliveryToken token) {} @Override public void connectionLost(Throwable arg0) { Log.i(TAG, "连接断开 "); doClientConnection(); //连接断开,重连 }}; @Override public void onDestroy() { try { mqttAndroidClient.disconnect(); //断开连接 } catch (MqttException e) { e.printStackTrace(); } super.onDestroy(); } }

【android高级|android集成MQTT(最新)】4 在Activity 启动 MyMqttService
MyMqttService.startService(this); //开启服务
5 在你要发送消息的地方,发布消息给MQTT
MyMqttService.publish(“要发送的消息”);
6 在MyMqttService观察MQTT订阅的消息
android高级|android集成MQTT(最新)
文章图片

android高级|android集成MQTT(最新)
文章图片

    推荐阅读