基于 Publish/Subscribe 使用 Android 应用程序的本地代理 (MQTT)

Local broker (MQTT) based Publish/Subscribe using Android Application

在我当前的项目中,我使用 this link 使用基于 MQTT 本地代理的实施和设置。基于本地代理的 MQTT 实施工作完美。

最初,我使用 Node.js 发布者和订阅者与这个本地代理来启用消息交换,它也可以在没有互联网连接的情况下工作(因为这些设备连接到同一个网络,mqtt API 对于基于本地代理的实现类似于 http://192.168.0.105).

现在,我想用 Android 应用程序替换 Node.js 订阅者。我的 android 应用程序使用 mqtt paho 库进行通信并使用 mqtt APImqtt://test.mosquitto.org:1883。如果我使用的是 mqtt open broker,即 mosquitto,那么 Node.js 发布者和 Android 应用程序订阅者都可以正常工作(这种情况是使用 API mqtt://test.mosquitto.org:1883 谈论基于 mosquitto 的 mqtt 实现).

现在,我想在 Android 应用程序中使用基于本地代理的实现。所以我必须用 http://192.168.0.105 替换 mqtt://test.mosquitto.org:1883 API 因为我想使用基于本地代理的 mqtt 实现。当我在 android 代码中使用 http://192.168.0.105 API 并部署到移动设备时,它显示 Unfortunately, Android Application has stopped在andoid应用程序中。

如何在 android 应用程序中使用基于本地代理的实现?

另外共享 Node.js 发布者和订阅者的代码。 Node.js 发布者(基于本地代理的实施)

var mqtt = require('mqtt');
var client = mqtt.connect('http://192.168.0.105');
setInterval(function() {
var data = {
     "tempValue" : Math.random(),
     "unitOfMeasurement" : 'C'
     };
client.publish('tempMeasurement', JSON.stringify(data));
}, 5000);

Node.js 订阅者(基于本地代理的实现)

var mqtt = require('mqtt');
var client = mqtt.connect('http://192.168.0.105');
client.subscribe('tempMeasurement'); 
client.on('message', function(topic, payload) {
       if (topic.toString() == "tempMeasurement") {  
       console.log("Mesage Received "+ payload.toString());
   }       
 });

为更清楚起见,共享 MQTT 订阅者代码:

package iotsuite.pubsubmiddleware;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;

public class MQTTSubscriber {

// URI for open MQTT broker
// public static final String BROKER_URL = "tcp://test.mosquitto.org:1883";
public static final String BROKER_URL = "mqtt://192.168.0.105";

private MqttClient client;

public MQTTSubscriber(PubSubMiddleware pubsub) {
    try {
        client = new MqttClient(BROKER_URL, MqttClient.generateClientId(),
                new MemoryPersistence());

        client.setCallback(new PushCallback(pubsub));
        client.connect();

    } catch (MqttException e) {
        e.printStackTrace();
    }
}

public void subscribe(String topicName) throws MqttException {

    client.subscribe(topicName);

    System.out.println("Subscribed. Topic: " + topicName);
}

}

并使用以下代码订阅 tempMeasurement

package framework;

 import iotsuite.common.Logger;
 import iotsuite.pubsubmiddleware.PubSubMiddleware;
 import iotsuite.pubsubmiddleware.Subscriber;
 import iotsuite.semanticmodel.Device;
 import android.util.Log;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;

 public abstract class SmartHomeApp implements Runnable, Subscriber {

protected final PubSubMiddleware myPubSubMiddleware;
protected final Device myDeviceInfo;
Gson gson = new Gson();

public SmartHomeApp(PubSubMiddleware pubSubM, Device deviceInfo) {

    this.myPubSubMiddleware = pubSubM;
    this.myDeviceInfo = deviceInfo;
    postInitialize();

}

protected void postInitialize() {
    subscribeDisplayTemp();

}

@Override
public void notifyReceived(String eventName, Object arg) {
    try {

        if (eventName.equals("tempMeasurement")) {
            Logger.log(myDeviceInfo.getName(), "TempMonitoringApp",
                    "Notification Received tempMeasurement ");
            JsonObject jsonObject = new JsonParser().parse(arg.toString())
                    .getAsJsonObject();
            double tempValue = jsonObject.get("tempValue").getAsDouble();
            // double yahootempValue =
            // jsonObject.get("yahootempValue").getAsDouble();
            TempStruct tempStruct = new TempStruct(tempValue, "C");
            System.out.println("tempValue is " + tempValue
                    + " in framework");
            Log.i("tempValue", "tempValue is received in framework");
            onNewDisplayTempNotify(tempStruct);
        }

    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Override
public void run() {
}

public abstract void onNewDisplayTempNotify(TempStruct arg);

public void subscribeDisplayTemp() {

    this.myPubSubMiddleware.subscribe(this, "tempMeasurement");
}

}

使用上面的代码,我想将从 Node.js 代码接收到的 tempValue 显示到 Android 应用程序。来自 Android 的错误如下:

01-04 23:03:26.786: E/AndroidRuntime(21793): FATAL EXCEPTION: main
01-04 23:03:26.786: E/AndroidRuntime(21793): Process: com.example.android,      PID: 21793
01-04 23:03:26.786: E/AndroidRuntime(21793): java.lang.RuntimeException:    Unable to start activity         ComponentInfo{com.example.android/com.example.android.MainActivity}:       java.lang.IllegalArgumentException
01-04 23:03:26.786: E/AndroidRuntime(21793):    at    android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2426)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:2490)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at android.app.ActivityThread.-wrap11(ActivityThread.java)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1354)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at android.os.Handler.dispatchMessage(Handler.java:102)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at android.os.Looper.loop(Looper.java:148)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at android.app.ActivityThread.main(ActivityThread.java:5443)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at java.lang.reflect.Method.invoke(Native Method)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:728)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:618)
01-04 23:03:26.786: E/AndroidRuntime(21793): Caused by: java.lang.IllegalArgumentException
01-04 23:03:26.786: E/AndroidRuntime(21793):    at  org.eclipse.paho.client.mqttv3.MqttClient.validateURI(MqttClient.java:204)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at org.eclipse.paho.client.mqttv3.MqttClient.<init>(MqttClient.java:175)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at iotsuite.pubsubmiddleware.MQTTPublisher.<init>(MQTTPublisher.java:21)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at iotsuite.pubsubmiddleware.PubSubMiddleware.<init>(PubSubMiddleware.java:58)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at      iotsuite.pubsubmiddleware.IoTSuiteFactory.getInstance(IoTSuiteFactory.java:16)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at  sim.deviceD9.Startup.setUpNode(Startup.java:25)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at sim.deviceD9.Startup.startDevice(Startup.java:63)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at com.example.android.MainActivity.onCreate(MainActivity.java:21)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at android.app.Activity.performCreate(Activity.java:6259)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java:1130)
01-04 23:03:26.786: E/AndroidRuntime(21793):    at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2379)
01-04 23:03:26.786: E/AndroidRuntime(21793):    ... 9 more

问题是您的 URI 使用的是 http:// 方案。 http:// 用于连接到 HTTP 服务器而不是 MQTT 代理。

您需要为 Android 使用 tcp:// URI(这可能也适用于 NodeJS,但 mqtt:// 肯定适用于 NodeJS

mqtt://192.168.0.105

var mqtt = require('mqtt');
var client = mqtt.connect('mqtt://192.168.0.105');
setInterval(function() {
var data = {
     "tempValue" : Math.random(),
     "unitOfMeasurement" : 'C'
     };
client.publish('tempMeasurement', JSON.stringify(data));
}, 5000);