无法在 RabbitMQ 接收器中将 content_type 从 application/octet-stream 更改为 application/json
Can't change content_type from application/octet-stream to application/json in RabbitMQ sink
我有一个简单的 RabbitMQ 源和接收器。我正在使用以下属性向源队列发布消息:
content_type -> application/json
和一个JSON有效载荷:
{
"userId": 2,
"customerId": 1,
}
RabbitMQ 接收器使用 application/octet-stream
而不是 JSON 获取消息。
我尝试启动具有以下属性的应用程序:
spring.cloud.stream.default.contentType=application/json
但没有用。
流定义:
stream_1=rabbitSource: rabbit --queues=queue1 --password=p --host=h --username=u | sink: rabbit --exchange=ex --routing-key=rk --converter-bean-name=jsonConverter --password=p --host=h --username=u
如何将内容类型设置为 application/json
? reference guide 似乎没有答案。
发布版本:
- spring-cloud-dataflow-server:2.0.1.RELEASE
- spring-cloud-skipper-server:2.0.0.RELEASE
更新:
正如@SabbyAnandan 的回答所建议的,我现在 运行:
dataflow:>stream create --name test123 --definition "rabbitSource: rabbit --queues=queue --password=p --host=rmq --username=u --spring.cloud.stream.bindings.output.contentType='application/json' | sink: rabbit --exchange=ex --routing-key=rk --converter-bean-name=jsonConverter --password=p --host=rmq --username=p"
Created new stream 'test123123'
dataflow:>stream deploy --name test123 --properties "app.rabbit.spring.cloud.stream.bindings.output.contentType='application/json'"
Deployment request has been sent for stream 'test123'
但是content_type
还是老样子
在 Rabbit 源头中设置的默认内容类型是:content-type: application/x-java-serialized-object
。
您可以通过传递覆盖该行为:
1) --spring.cloud.stream.bindings.output.contentType='application/json'
作为 Rabbit 源的内联 属性。
2) 在部署流时,您可以通过 --properties="app.rabbit.spring.cloud.stream.bindings.output.contentType='application/json'
.
覆盖特定于应用程序的 属性
部署后,您可以通过转到特定于应用程序的 http://<APP-HOST>:<APP-PORT?/actuator/configprops
端点来确认 属性 是否被覆盖。
要解决所有这些问题,您可以 运行 独立应用程序(即 java -jar..
)以在通过 SCDF 运行 启用它们之前确认行为。如果它独立工作,它也应该在 SCDF 中工作。
@Maroun,如前所述,这里有一些选项。
- Rabbit 接收器始终以
byte[]
接收数据。您可以通过提供配置 (--rabbit.converterBeanName=jsonConverter
) 强制它使用 json 转换器。但默认情况下使用 Base64 编码器,生成的文本将进行 Base64 编码。然后在自定义应用程序中,它从接收器发布到的交换接收数据,您需要提供一个自定义转换器,该转换器 属性 使用 Base64 解码器进行解码。
- 您的另一个选择是修补 Rabbit 水槽以满足您的需要。克隆 Rabbit app starter repo,然后提供一个
PassthroughConverter
,它只是将传入的 byte[]
传递到下游。您还需要以某种方式更改 Message header 上的内容类型(更改为 application/json
)。然后正常的 Jackson
转换器将在自定义应用程序端工作,因为在这种情况下 byte[]
不是 Base64 编码的。
我有一个简单的 RabbitMQ 源和接收器。我正在使用以下属性向源队列发布消息:
content_type -> application/json
和一个JSON有效载荷:
{
"userId": 2,
"customerId": 1,
}
RabbitMQ 接收器使用 application/octet-stream
而不是 JSON 获取消息。
我尝试启动具有以下属性的应用程序:
spring.cloud.stream.default.contentType=application/json
但没有用。
流定义:
stream_1=rabbitSource: rabbit --queues=queue1 --password=p --host=h --username=u | sink: rabbit --exchange=ex --routing-key=rk --converter-bean-name=jsonConverter --password=p --host=h --username=u
如何将内容类型设置为 application/json
? reference guide 似乎没有答案。
发布版本:
- spring-cloud-dataflow-server:2.0.1.RELEASE
- spring-cloud-skipper-server:2.0.0.RELEASE
更新:
正如@SabbyAnandan 的回答所建议的,我现在 运行:
dataflow:>stream create --name test123 --definition "rabbitSource: rabbit --queues=queue --password=p --host=rmq --username=u --spring.cloud.stream.bindings.output.contentType='application/json' | sink: rabbit --exchange=ex --routing-key=rk --converter-bean-name=jsonConverter --password=p --host=rmq --username=p"
Created new stream 'test123123'
dataflow:>stream deploy --name test123 --properties "app.rabbit.spring.cloud.stream.bindings.output.contentType='application/json'"
Deployment request has been sent for stream 'test123'
但是content_type
还是老样子
在 Rabbit 源头中设置的默认内容类型是:content-type: application/x-java-serialized-object
。
您可以通过传递覆盖该行为:
1) --spring.cloud.stream.bindings.output.contentType='application/json'
作为 Rabbit 源的内联 属性。
2) 在部署流时,您可以通过 --properties="app.rabbit.spring.cloud.stream.bindings.output.contentType='application/json'
.
部署后,您可以通过转到特定于应用程序的 http://<APP-HOST>:<APP-PORT?/actuator/configprops
端点来确认 属性 是否被覆盖。
要解决所有这些问题,您可以 运行 独立应用程序(即 java -jar..
)以在通过 SCDF 运行 启用它们之前确认行为。如果它独立工作,它也应该在 SCDF 中工作。
@Maroun,如前所述,这里有一些选项。
- Rabbit 接收器始终以
byte[]
接收数据。您可以通过提供配置 (--rabbit.converterBeanName=jsonConverter
) 强制它使用 json 转换器。但默认情况下使用 Base64 编码器,生成的文本将进行 Base64 编码。然后在自定义应用程序中,它从接收器发布到的交换接收数据,您需要提供一个自定义转换器,该转换器 属性 使用 Base64 解码器进行解码。 - 您的另一个选择是修补 Rabbit 水槽以满足您的需要。克隆 Rabbit app starter repo,然后提供一个
PassthroughConverter
,它只是将传入的byte[]
传递到下游。您还需要以某种方式更改 Message header 上的内容类型(更改为application/json
)。然后正常的Jackson
转换器将在自定义应用程序端工作,因为在这种情况下byte[]
不是 Base64 编码的。