Flink RMQSource
Flink RMQSource
我想测试 RMQSource class 从 RabbitMQ 接收数据,但我不知道如何为我的交换配置 Rabbit 虚拟主机,我认为这是我遇到的问题。我的代码:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object rabbitjob {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
def main (args:Array[String]){
env.execute("Test Rabbit")
}
}
IntelliJ 错误IDE:
Error:(10, 29) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
^
Error:(10, 29) not enough arguments for method addSource: (implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[String])org.apache.flink.streaming.api.scala.DataStream[String].
Unspecified value parameter evidence.
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
^
知道如何解决它或替代方案吗?
提前谢谢你。
您还需要提供虚拟主机名称。看看AMQP URI spec.
在您的情况下,整个 AMQP URI 看起来像 "user:pass@192.168.1.11:5672/TestVHost"
。
您看到的错误是 Scala 编译时错误,由一些需要的导入不存在引起。每当您使用 Flink Scala API 时,您应该包括以下内容:
import org.apache.flink.api.scala._
这将解决您遇到的编译时问题。
随着时间的推移,情况发生了变化。
请看一下RMQConnectionConfig:在这里你可以找到通过构建器模式指定虚拟主机的方法。
我想测试 RMQSource class 从 RabbitMQ 接收数据,但我不知道如何为我的交换配置 Rabbit 虚拟主机,我认为这是我遇到的问题。我的代码:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object rabbitjob {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
def main (args:Array[String]){
env.execute("Test Rabbit")
}
}
IntelliJ 错误IDE:
Error:(10, 29) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
^
Error:(10, 29) not enough arguments for method addSource: (implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[String])org.apache.flink.streaming.api.scala.DataStream[String].
Unspecified value parameter evidence.
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
^
知道如何解决它或替代方案吗? 提前谢谢你。
您还需要提供虚拟主机名称。看看AMQP URI spec.
在您的情况下,整个 AMQP URI 看起来像 "user:pass@192.168.1.11:5672/TestVHost"
。
您看到的错误是 Scala 编译时错误,由一些需要的导入不存在引起。每当您使用 Flink Scala API 时,您应该包括以下内容:
import org.apache.flink.api.scala._
这将解决您遇到的编译时问题。
随着时间的推移,情况发生了变化。 请看一下RMQConnectionConfig:在这里你可以找到通过构建器模式指定虚拟主机的方法。