无法在 RabbitMQ 接收器中将 content_type 从 application/octet-stream 更改为 application/json

Can't change content_type from application/octet-stream to application/json in RabbitMQ sink

我有一个简单的 RabbitMQ 源和接收器。我正在使用以下属性向源队列发布消息:

content_type -> application/json

和一个JSON有效载荷:

{
  "userId": 2,
  "customerId": 1,
}

RabbitMQ 接收器使用 application/octet-stream 而不是 JSON 获取消息。

我尝试启动具有以下属性的应用程序:

spring.cloud.stream.default.contentType=application/json

但没有用。

流定义:

stream_1=rabbitSource: rabbit --queues=queue1 --password=p --host=h --username=u | sink: rabbit --exchange=ex --routing-key=rk --converter-bean-name=jsonConverter --password=p --host=h --username=u

如何将内容类型设置为 application/jsonreference guide 似乎没有答案。

发布版本:

更新:

正如@SabbyAnandan 的回答所建议的,我现在 运行:

dataflow:>stream create --name test123 --definition "rabbitSource: rabbit --queues=queue --password=p --host=rmq --username=u --spring.cloud.stream.bindings.output.contentType='application/json' | sink: rabbit --exchange=ex --routing-key=rk --converter-bean-name=jsonConverter --password=p --host=rmq --username=p"
Created new stream 'test123123'

dataflow:>stream deploy --name test123 --properties "app.rabbit.spring.cloud.stream.bindings.output.contentType='application/json'"
Deployment request has been sent for stream 'test123'

但是content_type还是老样子

在 Rabbit 源头中设置的默认内容类型是:content-type: application/x-java-serialized-object

您可以通过传递覆盖该行为:

1) --spring.cloud.stream.bindings.output.contentType='application/json' 作为 Rabbit 源的内联 属性。

2) 在部署流时,您可以通过 --properties="app.rabbit.spring.cloud.stream.bindings.output.contentType='application/json'.

覆盖特定于应用程序的 属性

部署后,您可以通过转到特定于应用程序的 http://<APP-HOST>:<APP-PORT?/actuator/configprops 端点来确认 属性 是否被覆盖。

要解决所有这些问题,您可以 运行 独立应用程序(即 java -jar..)以在通过 SCDF 运行 启用它们之前确认行为。如果它独立工作,它也应该在 SCDF 中工作。

@Maroun,如前所述,这里有一些选项。

  1. Rabbit 接收器始终以 byte[] 接收数据。您可以通过提供配置 (--rabbit.converterBeanName=jsonConverter) 强制它使用 json 转换器。但默认情况下使用 Base64 编码器,生成的文本将进行 Base64 编码。然后在自定义应用程序中,它从接收器发布到的交换接收数据,您需要提供一个自定义转换器,该转换器 属性 使用 Base64 解码器进行解码。
  2. 您的另一个选择是修补 Rabbit 水槽以满足您的需要。克隆 Rabbit app starter repo,然后提供一个 PassthroughConverter,它只是将传入的 byte[] 传递到下游。您还需要以某种方式更改 Message header 上的内容类型(更改为 application/json)。然后正常的 Jackson 转换器将在自定义应用程序端工作,因为在这种情况下 byte[] 不是 Base64 编码的。