M2MQTT 客户端在没有异常或错误消息的情况下断开连接
M2MQTT client disconnecting without an exception or error message
我正在尝试创建一个包含各种主题的 API。
为此,我尝试对事物进行多线程处理,以便稍后可以将整个事物扩展为多个 APIs,但这不是重点。
我正在使用 ASP.net Core 4.0,如果这与它有任何关系的话。 Entity Framework 还有。
我的问题是因为我与 Mosquitto 服务器的连接在一分钟左右后断开,但没有抛出异常或类似问题。消息有多大或交换了多少并不重要。我不知道如何创建回调或任何类似的东西来了解我的连接发生了什么。有人可以帮忙吗?
我将 link 用于建立连接和订阅连接的代码。使用 Subscribe 方法或手动执行也不会改变任何内容。我在这里不知所措。
提前致谢!
Main.cs:
Task.Factory.StartNew(() => DataflowController.ResumeQueuesAsync());
BuildWebHost(args).Run();
DataflowController.cs:
public static Boolean Subscribe(String topic)
{
Console.WriteLine("Hello from " + topic);
MqttClient mqttClient = new MqttClient(brokerAddress);
byte code = mqttClient.Connect(Guid.NewGuid().ToString());
// Register to message received
mqttClient.MqttMsgPublishReceived += client_recievedMessageAsync;
string clientId = Guid.NewGuid().ToString();
mqttClient.Connect(clientId);
// Subscribe to topic
mqttClient.Subscribe(new String[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
System.Console.ReadLine();
return true;
}
public static async Task ResumeQueuesAsync()
{
var mongoClient = new MongoClient(connectionString);
var db = mongoClient.GetDatabase(databaseName);
var topics = db.GetCollection<BsonDocument>(topicCollection);
var filter = new BsonDocument();
List<BsonDocument> result = topics.Find(filter).ToList();
var resultSize = result.Count;
Task[] subscriptions = new Task[resultSize];
MqttClient mqttClient = new MqttClient(brokerAddress);
byte code = mqttClient.Connect(Guid.NewGuid().ToString());
// Register to message received
mqttClient.MqttMsgPublishReceived += client_recievedMessageAsync;
string clientId = Guid.NewGuid().ToString();
mqttClient.Connect(clientId);
int counter = 0;
foreach(var doc in result)
{
subscriptions[counter] = new Task(() =>
{
Console.WriteLine("Hello from " + doc["topic"].ToString());
// Subscribe to topic
mqttClient.Subscribe(new String[] { doc["topic"].ToString() }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
System.Console.ReadLine();
});
counter++;
}
foreach(Task task in subscriptions)
{
task.Start();
}
}
static async void client_recievedMessageAsync(object sender, MqttMsgPublishEventArgs e)
{
// Handle message received
var message = System.Text.Encoding.Default.GetString(e.Message);
var topic = e.Topic;
var id = topic.Split("/")[2];
BsonDocument doc = new BsonDocument {
{"Plug ID", id },
{"Consumption", message }
};
await Save(doc, "smartPDM_consumption");
System.Console.WriteLine("Message received from " + topic + " : " + message);
}
这一行是问题所在:
byte code = mqttClient.Connect(Guid.NewGuid().ToString());
删除了它,它刚刚起作用。
我正在尝试创建一个包含各种主题的 API。 为此,我尝试对事物进行多线程处理,以便稍后可以将整个事物扩展为多个 APIs,但这不是重点。
我正在使用 ASP.net Core 4.0,如果这与它有任何关系的话。 Entity Framework 还有。
我的问题是因为我与 Mosquitto 服务器的连接在一分钟左右后断开,但没有抛出异常或类似问题。消息有多大或交换了多少并不重要。我不知道如何创建回调或任何类似的东西来了解我的连接发生了什么。有人可以帮忙吗?
我将 link 用于建立连接和订阅连接的代码。使用 Subscribe 方法或手动执行也不会改变任何内容。我在这里不知所措。
提前致谢!
Main.cs:
Task.Factory.StartNew(() => DataflowController.ResumeQueuesAsync());
BuildWebHost(args).Run();
DataflowController.cs:
public static Boolean Subscribe(String topic)
{
Console.WriteLine("Hello from " + topic);
MqttClient mqttClient = new MqttClient(brokerAddress);
byte code = mqttClient.Connect(Guid.NewGuid().ToString());
// Register to message received
mqttClient.MqttMsgPublishReceived += client_recievedMessageAsync;
string clientId = Guid.NewGuid().ToString();
mqttClient.Connect(clientId);
// Subscribe to topic
mqttClient.Subscribe(new String[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
System.Console.ReadLine();
return true;
}
public static async Task ResumeQueuesAsync()
{
var mongoClient = new MongoClient(connectionString);
var db = mongoClient.GetDatabase(databaseName);
var topics = db.GetCollection<BsonDocument>(topicCollection);
var filter = new BsonDocument();
List<BsonDocument> result = topics.Find(filter).ToList();
var resultSize = result.Count;
Task[] subscriptions = new Task[resultSize];
MqttClient mqttClient = new MqttClient(brokerAddress);
byte code = mqttClient.Connect(Guid.NewGuid().ToString());
// Register to message received
mqttClient.MqttMsgPublishReceived += client_recievedMessageAsync;
string clientId = Guid.NewGuid().ToString();
mqttClient.Connect(clientId);
int counter = 0;
foreach(var doc in result)
{
subscriptions[counter] = new Task(() =>
{
Console.WriteLine("Hello from " + doc["topic"].ToString());
// Subscribe to topic
mqttClient.Subscribe(new String[] { doc["topic"].ToString() }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
System.Console.ReadLine();
});
counter++;
}
foreach(Task task in subscriptions)
{
task.Start();
}
}
static async void client_recievedMessageAsync(object sender, MqttMsgPublishEventArgs e)
{
// Handle message received
var message = System.Text.Encoding.Default.GetString(e.Message);
var topic = e.Topic;
var id = topic.Split("/")[2];
BsonDocument doc = new BsonDocument {
{"Plug ID", id },
{"Consumption", message }
};
await Save(doc, "smartPDM_consumption");
System.Console.WriteLine("Message received from " + topic + " : " + message);
}
这一行是问题所在:
byte code = mqttClient.Connect(Guid.NewGuid().ToString());
删除了它,它刚刚起作用。