NServiceBus 和 IoT - 将来自客户端的消息直接放入队列中,并在服务器端使用 NServiceBus 进行处理

NServiceBus & IoT - place a message from client directly in queue and process with NServiceBus at server side

Out 后端是使用 NServiceBus(版本 5)在 .NET 中编写的。到目前为止,我们的客户端(.NET 客户端和 c++ 客户端)正在向具有 NServiceBus 的 WCF 服务发送消息,并且 WCF 服务将 NServiceBus 消息发送给正确的工作人员。

现在我们正在对我们的体系结构进行一些更改,并希望一些客户端直接将消息放入队列并跳过 WCF。我们有很多设备(大多数是用 c++ 编写的)。

我知道 NServiceBus 用它自己的对象包装它放入队列中的消息。我的问题是 NServiceBus 是否有任何方法可以在 IoT 环境中工作 - 设备将消息直接放入队列中,中间没有适配器?或者让服务器端(worker)处理 "regular" 没有用 NServiceBus 对象包装的消息?

没有人能保证传输消息的格式。我建议您远离排队消息的想法,并期望 NServiceBus 直接使用它们。

取而代之的是,让 broke 接收来自设备的消息,这将根据需要将它们重新发送到 NServiceBus。因此,设备通常会发出优化的消息,例如,适合尽可能少的 TCP 帧。 NServiceBus 不关心这个。使用 Json 和 XML 序列化程序检查传输消息 - 它们很大。

我使用 AQMP 在我的事件中心和设备之间进行消息交换。然后,在事件中心,我可以做我想做的事,包括使用 NServiceBus。考虑行业对设备的建议。 AQMP 可能就是您想要的。

阿列克谢, 您的建议不是为每条消息添加另一个跃点。如果是这种情况,为什么不直接从 IoT 设备到云进行 HTTP 调用。 HTTP 处理程序将使用 NServiceBus 客户端库将其放入队列中。通过使用 HTTP 处理程序,将来我将有一个更好的选择来更改内部实现(因此队列)。

如果您只是想向目的地发送消息或发布活动,您没有提到您的交通工具是什么?其中一些选择可能会改变此处的答案,但您应该了解其中的要点。

可以通过将消息放入队列来直接与 NServiceBus 集成。如果你看documentation you'll see there is 'Scripting' section under each transport which shows you how to put messages directly into the queue. If you want to integrate with MSMQ you can find the documentation page here

NSB 消息带有 Headers. Most of the values in the header is optional and they come with a sensible default values so all you really need is the message type (the actual type name for the message payload). Assuming you want to do a Send, you can see all the headers that's at play here。同样,您不需要所有这些。

回答您的问题:要从您的 C++ 代码与 NSB 集成,您可以将此 C# 代码转换为 C++,这就是您所需要的:

public static void SendMessage(string queuePath, string messageBody, List<HeaderInfo> headers)
{
    using (var scope = new TransactionScope())
    {
        using (var queue = new MessageQueue(queuePath))
        using (Message message = new Message())
        {
            message.BodyStream = new MemoryStream(Encoding.UTF8.GetBytes(messageBody));
            message.Extension = CreateHeaders(headers);
            queue.Send(message, MessageQueueTransactionType.Automatic);
        }
        scope.Complete();
    }
}

public static byte[] CreateHeaders(List<HeaderInfo> headerInfos)
{
    XmlSerializer serializer = new XmlSerializer(typeof(List<HeaderInfo>));
    using (var stream = new MemoryStream())
    {
        serializer.Serialize(stream, headerInfos);
        return stream.ToArray();
    }
}

public class HeaderInfo
{
    public string Key { get; set; }
    public string Value { get; set; }
}

注意事项:

  • MSMQ 与 TransactionScope 配合使用。我不认为物联网是一个选项,具体取决于您使用的设备。
  • 您仍然需要一个库才能将消息放入 MSMQ。

如果您选择了 SQL 传输,它会使此集成变得更加容易,因为您所要做的就是在数据库中写入一条记录(您还可以找到 here).