在不自动创建 RabbitMQ 队列的情况下使用 RabbitMQ 作为 Flink DataStream 源
Using RabbitMQ as Flink DataStream Source without create RabbitMQ queue automatically
当我使用RabbitMQ作为Flink DataStream Source时,正如Flink Documentation所说
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build();
final DataStream<String> stream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
此代码将连接到 RabbitMQ 并自动创建队列 "queueName"。所以我遇到了问题。 RabbitMQ 队列已经存在,我之前创建了它。我不希望 Flink 再次尝试创建。问题是 Flink 创建的 Queue 没有一些参数,这与我之前创建的 Queue 冲突。这是例外:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'queueName' in vhost '/': received none but current is the value '604800000' of type 'long', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136)
... 10 more
如何让 Flink 只订阅一个 RabbitMQ 队列而不尝试创建一个新队列?谢谢大家
您可以编写自己的 class 扩展 RMQSource 并覆盖 setupQueue 方法,以便不创建队列
当我使用RabbitMQ作为Flink DataStream Source时,正如Flink Documentation所说
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build();
final DataStream<String> stream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
此代码将连接到 RabbitMQ 并自动创建队列 "queueName"。所以我遇到了问题。 RabbitMQ 队列已经存在,我之前创建了它。我不希望 Flink 再次尝试创建。问题是 Flink 创建的 Queue 没有一些参数,这与我之前创建的 Queue 冲突。这是例外:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'queueName' in vhost '/': received none but current is the value '604800000' of type 'long', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136)
... 10 more
如何让 Flink 只订阅一个 RabbitMQ 队列而不尝试创建一个新队列?谢谢大家
您可以编写自己的 class 扩展 RMQSource 并覆盖 setupQueue 方法,以便不创建队列