Azure 事件中心事件属性是如何序列化的?
How are Azure Event Hub event properties serialized?
我想生成并发送 EventData
对象,这些对象在 EventData.Properties 字典中设置了各种特定于应用程序的属性。 Properties
字典本身定义为 IDictionary<string, object>
,这意味着我可以将任何数据类型作为值传递。
var eventData = new EventData(bytes);
eventData.Properties["Prop1"] = // string?
eventData.Properties["Prop2"] = // int?
eventData.Properties["Prop3"] = // DateTime?
eventData.Properties["Prop4"] = // Custom?
我 实际上 可以将哪些数据类型传递到 Properties
字典中?显然这些数据需要以某种方式序列化,但文档没有提及任何相关内容。
我通过尝试一些不起作用的东西找到了更多信息,然后通过查看异常找到了源代码call-stack。
在我的例子中,支持的 属性 类型似乎源自 AMQP 协议支持的类型(记录在案 here)。在内部 AmqpMessageConverter
代码中实际上有一个很大的 switch 语句,它基本上给出了我的答案:
AmqpMessageConverter.TryGetAmqpObjectFromNetObject
总结一下:
- 所有 .NET 基元类型(
int
、string
、double
等)
Guid
DateTime
和 DateTimeOffset
Stream
Uri
TimeSpan
byte[]
IList
IDictionary
我自己的自定义类型(POCO - "Plain old CLR object")未被接受,并导致以下异常:
System.Runtime.Serialization.SerializationException: Serialization operation failed due to unsupported type EventHubsTesting.Program+Poco.
at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.TryGetAmqpObjectFromNetObject(Object netObject, MappingType mappingType, Object& amqpObject)
at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.UpdateAmqpMessageHeadersAndProperties(AmqpMessage message, String publisher, EventData eventData, Boolean copyUserProperties)
at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.EventDataToAmqpMessage(EventData eventData)
at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.EventDatasToAmqpMessage(IEnumerable`1 eventDatas, String partitionKey)
at Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender.OnSendAsync(IEnumerable`1 eventDatas, String partitionKey)
at Microsoft.Azure.EventHubs.EventDataSender.SendAsync(IEnumerable`1 eventDatas, String partitionKey)
at Microsoft.Azure.EventHubs.EventHubClient.SendAsync(IEnumerable`1 eventDatas, String partitionKey)
at EventHubsTesting.Program.Sender(CancellationTokenSource shutdownSource) in C:\EventHubsTesting\Program.cs:line 99
我假设如果我想使用自定义类型,那么在将它们分配为应用程序属性之前我需要自己序列化它们。
跟进 Chris 的回答 - AMQP headers 根据 AMQP 类型规范进行序列化 - http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html.
对于查看此答案的任何 Java 开发人员 - 您可以使用 Apache qpid 库来执行 encode/decode 操作。解码器 class 可以在这里找到 - https://qpid.apache.org/releases/qpid-proton-j-0.33.1/api/index.html.
EH 服务是 encoding-agnostic,因此通过 Kafka 协议读取的消费者需要手动解码 AMQP headers。本机 EH 客户端将为您解码 headers。参见 - https://github.com/Azure/azure-event-hubs-for-kafka/issues/56.
我想生成并发送 EventData
对象,这些对象在 EventData.Properties 字典中设置了各种特定于应用程序的属性。 Properties
字典本身定义为 IDictionary<string, object>
,这意味着我可以将任何数据类型作为值传递。
var eventData = new EventData(bytes);
eventData.Properties["Prop1"] = // string?
eventData.Properties["Prop2"] = // int?
eventData.Properties["Prop3"] = // DateTime?
eventData.Properties["Prop4"] = // Custom?
我 实际上 可以将哪些数据类型传递到 Properties
字典中?显然这些数据需要以某种方式序列化,但文档没有提及任何相关内容。
我通过尝试一些不起作用的东西找到了更多信息,然后通过查看异常找到了源代码call-stack。
在我的例子中,支持的 属性 类型似乎源自 AMQP 协议支持的类型(记录在案 here)。在内部 AmqpMessageConverter
代码中实际上有一个很大的 switch 语句,它基本上给出了我的答案:
AmqpMessageConverter.TryGetAmqpObjectFromNetObject
总结一下:
- 所有 .NET 基元类型(
int
、string
、double
等) Guid
DateTime
和DateTimeOffset
Stream
Uri
TimeSpan
byte[]
IList
IDictionary
我自己的自定义类型(POCO - "Plain old CLR object")未被接受,并导致以下异常:
System.Runtime.Serialization.SerializationException: Serialization operation failed due to unsupported type EventHubsTesting.Program+Poco.
at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.TryGetAmqpObjectFromNetObject(Object netObject, MappingType mappingType, Object& amqpObject)
at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.UpdateAmqpMessageHeadersAndProperties(AmqpMessage message, String publisher, EventData eventData, Boolean copyUserProperties)
at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.EventDataToAmqpMessage(EventData eventData)
at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.EventDatasToAmqpMessage(IEnumerable`1 eventDatas, String partitionKey)
at Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender.OnSendAsync(IEnumerable`1 eventDatas, String partitionKey)
at Microsoft.Azure.EventHubs.EventDataSender.SendAsync(IEnumerable`1 eventDatas, String partitionKey)
at Microsoft.Azure.EventHubs.EventHubClient.SendAsync(IEnumerable`1 eventDatas, String partitionKey)
at EventHubsTesting.Program.Sender(CancellationTokenSource shutdownSource) in C:\EventHubsTesting\Program.cs:line 99
我假设如果我想使用自定义类型,那么在将它们分配为应用程序属性之前我需要自己序列化它们。
跟进 Chris 的回答 - AMQP headers 根据 AMQP 类型规范进行序列化 - http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html.
对于查看此答案的任何 Java 开发人员 - 您可以使用 Apache qpid 库来执行 encode/decode 操作。解码器 class 可以在这里找到 - https://qpid.apache.org/releases/qpid-proton-j-0.33.1/api/index.html.
EH 服务是 encoding-agnostic,因此通过 Kafka 协议读取的消费者需要手动解码 AMQP headers。本机 EH 客户端将为您解码 headers。参见 - https://github.com/Azure/azure-event-hubs-for-kafka/issues/56.