从 nodejs 服务器接收来自 ServiceBus 的 BrokeredMessage
Receiving BrokeredMessage from ServiceBus from a nodejs server
我正在从新的 nodejs 服务器读取来自现有 Azure ServiceBus 的消息。
这是将消息从 .NET 服务器发送到 ServiceBus 的方式:
var topicClient = TopicClient.CreateFromConnectionString(serviceBusConnectionString, routingKey);
var brokeredMessage = new BrokeredMessage(message.ToJson());
topicClient.Send(brokeredMessage);
其中 topicClient 是 Microsoft.ServiceBus.Messaging.TopicClient
我正在使用以下方法使用 azure-sb 包读取 nodejs 服务器上的消息:
sbClient = ServiceBusClient.createFromConnectionString(connectionString)
subscriptionClient = this.sbClient.createSubscriptionClient(topicName, subscriptionName);
receiver = this.subscriptionClient.createReceiver(ReceiveMode.receiveAndDelete);
messages = await this.receiver.receiveMessages(10,10);
console.log(messages.map(message => { message.body }));
message.body 是一个缓冲区,当我这样做时 message.body.toString('utf-8') 我得到类似的东西:
@string3http://schemas.microsoft.com/2003/10/Serialization/��{VALID JSON}
我当然对中间的有效 JSON 感兴趣。
在 .net 服务器中,我们只需执行 brokeredMessage.GetBody() 即可获得对象,那么在 nodejs 上是否有一种简单的方法可以做到这一点?
根据我的测试,如果我们使用标准库Microsoft.Azure.ServiceBus
在.Net应用中发送消息,它会直接JSON解析节点应用中的消息
例如
这是我发送消息的 C# 代码:
class Program
{
static void Main(string[] args)
{
string connectionString = "Endpoint=sb://...";
var client = new TopicClient(connectionString, "");
var payload = JsonConvert.SerializeObject(new DemoMessage() { Title = $"hello!!! {DateTime.Now}" });
var serviceBusMessage = new Message(Encoding.UTF8.GetBytes(payload));
serviceBusMessage.SessionId = Guid.NewGuid().ToString("D");
client.SendAsync(serviceBusMessage).Wait();
}
private class DemoMessage
{
public DemoMessage()
{
}
public string Title { get; set; }
}
这是我的 Node.js 接收消息的代码:
const { ServiceBusClient, ReceiveMode } = require("@azure/service-bus");
// Define connection string and related Service Bus entity names here
const connectionString =
"Endpoint=sb://";
const topicName = "***";
const subscriptionName = "***";
async function main() {
const sbClient = ServiceBusClient.createFromConnectionString(
connectionString,
);
const subscriptionClient = sbClient.createSubscriptionClient(
topicName,
subscriptionName,
);
const receiver = subscriptionClient.createReceiver(ReceiveMode.receiveAndDelete);
try {
const messages = await receiver.receiveMessages(1);
console.log("Received messages:");
console.log(messages.map((message) => message.body));
await subscriptionClient.close();
} finally {
await sbClient.close();
}
}
main().catch((err) => {
console.log("Error occurred: ", err);
});
此外,如果你还使用库WindowsAzure.ServiceBus
, we need to use BrokeredMessage(Stream messageBodyStream, bool ownsStream)
来初始化一个对象
因为我们使用BrokeredMessage(platload)
来初始化,它会使用DataContractSerializer和一个二进制的XmlDictionaryWriter来初始化一个对象。因此,负载正在使用带有二进制 XmlDictionaryWriter 的 DataContractSerializer 进行序列化,这就是为什么消息正文在其开头具有类型指示 @string3http://schemas.microsoft.com/2003/10/Serialization/
.
的原因
例如
这是我发送消息的 C# 代码:
var client =TopicClient.CreateFromConnectionString(connectionString, "test");
var payload = JsonConvert.SerializeObject(new DemoMessage() { Title = $"hello BrokeredMessage!!! {DateTime.Now}" });
using (Stream stream = new MemoryStream(Encoding.UTF8.GetBytes(payload))) {
var serviceBusMessage = new BrokeredMessage(stream,true);
await client.SendAsync(serviceBusMessage);
}
我用同样的代码接收
详情请参考here。
我正在从新的 nodejs 服务器读取来自现有 Azure ServiceBus 的消息。
这是将消息从 .NET 服务器发送到 ServiceBus 的方式:
var topicClient = TopicClient.CreateFromConnectionString(serviceBusConnectionString, routingKey);
var brokeredMessage = new BrokeredMessage(message.ToJson());
topicClient.Send(brokeredMessage);
其中 topicClient 是 Microsoft.ServiceBus.Messaging.TopicClient
我正在使用以下方法使用 azure-sb 包读取 nodejs 服务器上的消息:
sbClient = ServiceBusClient.createFromConnectionString(connectionString)
subscriptionClient = this.sbClient.createSubscriptionClient(topicName, subscriptionName);
receiver = this.subscriptionClient.createReceiver(ReceiveMode.receiveAndDelete);
messages = await this.receiver.receiveMessages(10,10);
console.log(messages.map(message => { message.body }));
message.body 是一个缓冲区,当我这样做时 message.body.toString('utf-8') 我得到类似的东西:
@string3http://schemas.microsoft.com/2003/10/Serialization/��{VALID JSON}
我当然对中间的有效 JSON 感兴趣。 在 .net 服务器中,我们只需执行 brokeredMessage.GetBody() 即可获得对象,那么在 nodejs 上是否有一种简单的方法可以做到这一点?
根据我的测试,如果我们使用标准库Microsoft.Azure.ServiceBus
在.Net应用中发送消息,它会直接JSON解析节点应用中的消息
例如
这是我发送消息的 C# 代码:
class Program
{
static void Main(string[] args)
{
string connectionString = "Endpoint=sb://...";
var client = new TopicClient(connectionString, "");
var payload = JsonConvert.SerializeObject(new DemoMessage() { Title = $"hello!!! {DateTime.Now}" });
var serviceBusMessage = new Message(Encoding.UTF8.GetBytes(payload));
serviceBusMessage.SessionId = Guid.NewGuid().ToString("D");
client.SendAsync(serviceBusMessage).Wait();
}
private class DemoMessage
{
public DemoMessage()
{
}
public string Title { get; set; }
}
这是我的 Node.js 接收消息的代码:
const { ServiceBusClient, ReceiveMode } = require("@azure/service-bus");
// Define connection string and related Service Bus entity names here
const connectionString =
"Endpoint=sb://";
const topicName = "***";
const subscriptionName = "***";
async function main() {
const sbClient = ServiceBusClient.createFromConnectionString(
connectionString,
);
const subscriptionClient = sbClient.createSubscriptionClient(
topicName,
subscriptionName,
);
const receiver = subscriptionClient.createReceiver(ReceiveMode.receiveAndDelete);
try {
const messages = await receiver.receiveMessages(1);
console.log("Received messages:");
console.log(messages.map((message) => message.body));
await subscriptionClient.close();
} finally {
await sbClient.close();
}
}
main().catch((err) => {
console.log("Error occurred: ", err);
});
此外,如果你还使用库WindowsAzure.ServiceBus
, we need to use BrokeredMessage(Stream messageBodyStream, bool ownsStream)
来初始化一个对象
因为我们使用BrokeredMessage(platload)
来初始化,它会使用DataContractSerializer和一个二进制的XmlDictionaryWriter来初始化一个对象。因此,负载正在使用带有二进制 XmlDictionaryWriter 的 DataContractSerializer 进行序列化,这就是为什么消息正文在其开头具有类型指示 @string3http://schemas.microsoft.com/2003/10/Serialization/
.
例如 这是我发送消息的 C# 代码:
var client =TopicClient.CreateFromConnectionString(connectionString, "test");
var payload = JsonConvert.SerializeObject(new DemoMessage() { Title = $"hello BrokeredMessage!!! {DateTime.Now}" });
using (Stream stream = new MemoryStream(Encoding.UTF8.GetBytes(payload))) {
var serviceBusMessage = new BrokeredMessage(stream,true);
await client.SendAsync(serviceBusMessage);
}
我用同样的代码接收
详情请参考here。