Flink RMQSource

Flink RMQSource

我想测试 RMQSource class 从 RabbitMQ 接收数据,但我不知道如何为我的交换配置 Rabbit 虚拟主机,我认为这是我遇到的问题。我的代码:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object rabbitjob {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print


def main (args:Array[String]){
    env.execute("Test Rabbit")
 }
} 

IntelliJ 错误IDE: Error:(10, 29) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print

^

Error:(10, 29) not enough arguments for method addSource: (implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[String])org.apache.flink.streaming.api.scala.DataStream[String]. Unspecified value parameter evidence. val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print

^

知道如何解决它或替代方案吗? 提前谢谢你。

您还需要提供虚拟主机名称。看看AMQP URI spec.

在您的情况下,整个 AMQP URI 看起来像 "user:pass@192.168.1.11:5672/TestVHost"

您看到的错误是 Scala 编译时错误,由一些需要的导入不存在引起。每当您使用 Flink Scala API 时,您应该包括以下内容:

import org.apache.flink.api.scala._

这将解决您遇到的编译时问题。

随着时间的推移,情况发生了变化。 请看一下RMQConnectionConfig:在这里你可以找到通过构建器模式指定虚拟主机的方法。