如何在 Scala 中使用 Beam 将 Kafka 消费到 MySQL
How to consume Kafka to MySQL using Beam in Scala
我想使用 Kafka 流中的数据并使用 Apache Beam 将其写入 MySQL。在 .apply(JdbcIO.write[KV[Integer, String]]()...
上显示错误 Cannot resolve overloaded method 'apply'
这是我的完整代码:
def main(cmdlineArgs: Array[String]): Unit = {
val options = PipelineOptionsFactory.create()
val pipeline = Pipeline.create(options)
pipeline
.apply(KafkaIO.read[Long, String]()
.withBootstrapServers("kafka-stag.my-domain.com:31090")
.withTopic("topic-saya")
.withKeyDeserializer(classOf[LongDeserializer])
.withValueDeserializer(classOf[StringDeserializer])
.updateConsumerProperties(
ImmutableMap.of("auto.offset.reset", "earliest".asInstanceOf[Object])
)
.withMaxNumRecords(5)
.withoutMetadata()
)
.apply(MapElements.into(TypeDescriptors.strings()))
.apply(JdbcIO.write[KV[Integer, String]]()
.withDataSourceConfiguration(DataSourceConfiguration.create(
"com.mysql.jdbc.Driver",
"jdbc:mysql://localhsot:3306/mydb")
.withUsername("root")
.withPassword("secret"))
.withStatement("insert into Person values(?, ?)")
.withPreparedStatementSetter(
(element: KV[Integer, String], query: PreparedStatement) => {
query.setInt(1, element.getKey())
query.setString(2, element.getValue())
})
)
pipeline.run().waitUntilFinish()
}
我应该怎么做才能解决这个问题?
我发现在调试 Scala 代码时分解长链方法很有用。
在这种情况下,您将三个 apply(...)
链接在一起:
// Heads up: you'll want to specify java.lang.Long when working
// with Java code and generics... this has bitten me *many* times.
val x: PCollection[KV[java.lang.Long, String]] = pipeline
.apply(KafkaIO.read[java.lang.Long, String]()
.withThisAndThatConfig(...))
// Probably not what you're looking for...
// MapElements requires via() to modify the element.
val y: PCollection[String] = x
.apply(MapElements.into(TypeDescriptors.strings()))
// Error happens below, the JdbcIO.write is expecting to be applied
// to a PCollection[KV[Integer, String]], which is not what it's getting.
val z: PDone = y
.apply(JdbcIO.write[KV[Integer, String]]()
.withThisAndThatConfig(...))
如果我明白你在找什么,你可能想仔细看看中间部分 apply
并确保它为 JdbcIO 发出正确的元素。处理这两种语言有点麻烦,但您最终可能会得到类似这样的结果:
val keystoInt: SerializableFunction[KV[java.lang.Long, String], KV[Integer, String]] =
(input: KV[java.lang.Long, String]) => KV.of(input.getKey.intValue, input.getValue)
val y: PCollection[KV[Integer, String]] = x
.apply(MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.integers, TypeDescriptors.strings))
.via(keysToInt))
我想使用 Kafka 流中的数据并使用 Apache Beam 将其写入 MySQL。在 .apply(JdbcIO.write[KV[Integer, String]]()...
Cannot resolve overloaded method 'apply'
这是我的完整代码:
def main(cmdlineArgs: Array[String]): Unit = {
val options = PipelineOptionsFactory.create()
val pipeline = Pipeline.create(options)
pipeline
.apply(KafkaIO.read[Long, String]()
.withBootstrapServers("kafka-stag.my-domain.com:31090")
.withTopic("topic-saya")
.withKeyDeserializer(classOf[LongDeserializer])
.withValueDeserializer(classOf[StringDeserializer])
.updateConsumerProperties(
ImmutableMap.of("auto.offset.reset", "earliest".asInstanceOf[Object])
)
.withMaxNumRecords(5)
.withoutMetadata()
)
.apply(MapElements.into(TypeDescriptors.strings()))
.apply(JdbcIO.write[KV[Integer, String]]()
.withDataSourceConfiguration(DataSourceConfiguration.create(
"com.mysql.jdbc.Driver",
"jdbc:mysql://localhsot:3306/mydb")
.withUsername("root")
.withPassword("secret"))
.withStatement("insert into Person values(?, ?)")
.withPreparedStatementSetter(
(element: KV[Integer, String], query: PreparedStatement) => {
query.setInt(1, element.getKey())
query.setString(2, element.getValue())
})
)
pipeline.run().waitUntilFinish()
}
我应该怎么做才能解决这个问题?
我发现在调试 Scala 代码时分解长链方法很有用。
在这种情况下,您将三个 apply(...)
链接在一起:
// Heads up: you'll want to specify java.lang.Long when working
// with Java code and generics... this has bitten me *many* times.
val x: PCollection[KV[java.lang.Long, String]] = pipeline
.apply(KafkaIO.read[java.lang.Long, String]()
.withThisAndThatConfig(...))
// Probably not what you're looking for...
// MapElements requires via() to modify the element.
val y: PCollection[String] = x
.apply(MapElements.into(TypeDescriptors.strings()))
// Error happens below, the JdbcIO.write is expecting to be applied
// to a PCollection[KV[Integer, String]], which is not what it's getting.
val z: PDone = y
.apply(JdbcIO.write[KV[Integer, String]]()
.withThisAndThatConfig(...))
如果我明白你在找什么,你可能想仔细看看中间部分 apply
并确保它为 JdbcIO 发出正确的元素。处理这两种语言有点麻烦,但您最终可能会得到类似这样的结果:
val keystoInt: SerializableFunction[KV[java.lang.Long, String], KV[Integer, String]] =
(input: KV[java.lang.Long, String]) => KV.of(input.getKey.intValue, input.getValue)
val y: PCollection[KV[Integer, String]] = x
.apply(MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.integers, TypeDescriptors.strings))
.via(keysToInt))