Azure 事件中心事件属性是如何序列化的?

How are Azure Event Hub event properties serialized?

我想生成并发送 EventData 对象,这些对象在 EventData.Properties 字典中设置了各种特定于应用程序的属性。 Properties 字典本身定义为 IDictionary<string, object>,这意味着我可以将任何数据类型作为值传递。

var eventData = new EventData(bytes);
eventData.Properties["Prop1"] = // string?
eventData.Properties["Prop2"] = // int?
eventData.Properties["Prop3"] = // DateTime?
eventData.Properties["Prop4"] = // Custom?

实际上 可以将哪些数据类型传递到 Properties 字典中?显然这些数据需要以某种方式序列化,但文档没有提及任何相关内容。

我通过尝试一些不起作用的东西找到了更多信息,然后通过查看异常找到了源代码call-stack。

在我的例子中,支持的 属性 类型似乎源自 AMQP 协议支持的类型(记录在案 here)。在内部 AmqpMessageConverter 代码中实际上有一个很大的 switch 语句,它基本上给出了我的答案:

AmqpMessageConverter.TryGetAmqpObjectFromNetObject

总结一下:

  • 所有 .NET 基元类型(intstringdouble 等)
  • Guid
  • DateTimeDateTimeOffset
  • Stream
  • Uri
  • TimeSpan
  • byte[]
  • IList
  • IDictionary

我自己的自定义类型(POCO - "Plain old CLR object")未被接受,并导致以下异常:

System.Runtime.Serialization.SerializationException: Serialization operation failed due to unsupported type EventHubsTesting.Program+Poco.
   at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.TryGetAmqpObjectFromNetObject(Object netObject, MappingType mappingType, Object& amqpObject)
   at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.UpdateAmqpMessageHeadersAndProperties(AmqpMessage message, String publisher, EventData eventData, Boolean copyUserProperties)
   at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.EventDataToAmqpMessage(EventData eventData)
   at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.EventDatasToAmqpMessage(IEnumerable`1 eventDatas, String partitionKey)
   at Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender.OnSendAsync(IEnumerable`1 eventDatas, String partitionKey)
   at Microsoft.Azure.EventHubs.EventDataSender.SendAsync(IEnumerable`1 eventDatas, String partitionKey)
   at Microsoft.Azure.EventHubs.EventHubClient.SendAsync(IEnumerable`1 eventDatas, String partitionKey)
   at EventHubsTesting.Program.Sender(CancellationTokenSource shutdownSource) in C:\EventHubsTesting\Program.cs:line 99

我假设如果我想使用自定义类型,那么在将它们分配为应用程序属性之前我需要自己序列化它们。

跟进 Chris 的回答 - AMQP headers 根据 AMQP 类型规范进行序列化 - http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html.

对于查看此答案的任何 Java 开发人员 - 您可以使用 Apache qpid 库来执行 encode/decode 操作。解码器 class 可以在这里找到 - https://qpid.apache.org/releases/qpid-proton-j-0.33.1/api/index.html.

EH 服务是 encoding-agnostic,因此通过 Kafka 协议读取的消费者需要手动解码 AMQP headers。本机 EH 客户端将为您解码 headers。参见 - https://github.com/Azure/azure-event-hubs-for-kafka/issues/56.