梁JdbcIO.readAll好像没有return结果
Beam JdbcIO.readAll doesn't seem to return results
我正在尝试使用 Apache Beam 为事件构建管道。我想做的是从 GCP PubSub 读取流数据并使用事件中的 ID 从 MySQL 读取相关元数据,然后将这两个流合并并写入我的 clickhouse 数据库。
但是 JdbcIO.readall() 似乎没有 return 它的流。正如您在 ClickhousePipeline
上看到的那样,在应用 CoGroupByKey.create()
之后,我试图合并两个 PCollection
,但是 userMetaData
是空的,而 ParDo
是链接的在 UserMetadataEnricher()
还没有执行之后。
在 UserMetadataEnricher
上的 withRowMapper
中,我添加了 println()
来检查它是否为 运行,并且它工作正常并从我的数据库中打印结果,但是,它不会 return 数据到下一个管道。
我猜问题与 Windowing
有关,我在没有 windowing 的情况下测试它时检查它是否正常工作。但是,PubSubIO 是 Unbounded PCollection
,所以我必须申请 window 才能使用 JDBCIO.readall()
对吗?我不知道解决这个问题。希望尽快得到答复!
主管道
object MainPipeline {
@JvmStatic
fun run(options: MainPipelineOptions) {
val p = Pipeline.create(options)
val events = p
.apply(
"Read DetailViewEvent PubSub",
PubsubIO.readStrings().fromSubscription(options.inputSubscription)
)
.apply(
"Extract messages",
ParseJsons.of(FoodDetailViewEvent::class.java)
.exceptionsInto(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())
)
.exceptionsVia { KV.of(it.element(), it.exception().javaClass.canonicalName) }
)
val validEvents =
events.output().setCoder(SerializableCoder.of(FoodDetailViewEvent::class.java))
val invalidEvents = events.failures()
invalidEvents.apply(FailurePipeline(options))
validEvents.apply(ClickhousePipeline(options))
p.run().waitUntilFinish()
}
@JvmStatic
fun main(args: Array<String>) {
val options = PipelineOptionsFactory
.fromArgs(*args)
.withValidation()
.`as`(MainPipelineOptions::class.java)
run(options)
}
}
Clickhouse管道
class ClickhousePipeline(private val options: MainPipelineOptions) :
PTransform<PCollection<DetailViewEvent>, PDone>() {
override fun expand(events: PCollection<DetailViewEvent>): PDone {
val windowedEvents = events
.apply(
"Window", Window
.into<DetailViewEvent>(GlobalWindows())
.triggering(
Repeatedly
.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
val userIdDetailViewEvents = windowedEvents
.apply(
MapElements.via(object :
SimpleFunction<DetailViewEvent, KV<String, DetailViewEvent>>() {
override fun apply(input: DetailViewEvent): KV<String, DetailViewEvent> {
return KV.of(input.userInfo.userId, input)
}
})
)
val userMetaData = userIdDetailViewEvents
.apply(
MapElements.via(object :
SimpleFunction<KV<String, DetailViewEvent>, String>() {
override fun apply(input: KV<String, DetailViewEvent>): String {
return input.key!!
}
})
)
.apply(
UserMetadataEnricher(options)
)
.apply(
ParDo.of(
object : DoFn<UserMetadata, KV<String, UserMetadata>>() {
@ProcessElement
fun processElement(
@Element data: UserMetadata,
out: OutputReceiver<KV<String, UserMetadata>>
) {
println("User:: ${data}") // Not printed!!
out.output(KV.of(data.userId, data))
}
})
)
val sourceTag = object : TupleTag<DetailViewEvent>() {}
val userMetadataTag = object : TupleTag<UserMetadata>() {}
val joinedPipeline: PCollection<KV<String, CoGbkResult>> =
KeyedPCollectionTuple.of(sourceTag, userIdDetailViewEvents)
.and(userMetadataTag, userMetaData)
.apply(CoGroupByKey.create())
val enrichedData = joinedPipeline.apply(
ParDo.of(object : DoFn<KV<String, CoGbkResult>, ClickHouseModel>() {
@ProcessElement
fun processElement(
@Element data: KV<String, CoGbkResult>,
out: OutputReceiver<ClickHouseModel>
) {
val name = data.key
val source = data.value.getAll(sourceTag)
val userMetadataSource = data.value.getAll(userMetadataTag)
println("==========================")
for (metadata in userMetadataSource.iterator()) {
println("Metadata:: $metadata") // This is always empty
}
for (event in source.iterator()) {
println("Event:: $event")
}
println("==========================")
val sourceEvent = source.iterator().next()
if (userMetadataSource.iterator().hasNext()) {
val userMetadataEvent = userMetadataSource.iterator().next()
out.output(
ClickHouseModel(
eventType = sourceEvent.eventType,
userMetadata = userMetadataEvent
)
)
}
}
})
)
val clickhouseData = enrichedData.apply(
ParDo.of(object : DoFn<ClickHouseModel, Row>() {
@ProcessElement
fun processElement(context: ProcessContext) {
val data = context.element()
context.output(
data.toSchema()
)
}
})
)
return clickhouseData
.setRowSchema(ClickHouseModel.schemaType())
.apply(
ClickHouseIO.write(
"jdbc:clickhouse://127.0.0.1:8123/test?password=example",
"clickhouse_test"
)
)
}
}
UserMetadataEnricher
class UserMetadataEnricher(private val options: MainPipelineOptions) :
PTransform<PCollection<String>, PCollection<UserMetadata>>() {
@Throws(Exception::class)
override fun expand(events: PCollection<String>): PCollection<UserMetadata> {
return events
.apply(
JdbcIO.readAll<String, UserMetadata>()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/beam-test"
)
.withUsername("root")
.withPassword("example")
)
.withQuery("select id,name,gender from user where id = ?")
.withParameterSetter { id: String, preparedStatement: PreparedStatement ->
preparedStatement.setString(1, id)
}
.withCoder(
SerializableCoder.of(
UserMetadata::class.java
)
)
.withRowMapper
{
println("RowMapper:: ${it.getString(1)}") // printed!!
UserMetadata(
it.getString(1),
it.getString(2),
it.getString(3)
)
}
)
}
}
输出
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================
更新 1(全局窗口到固定窗口)
使用后处理
我已经更改了我的 window 设置并将打印添加到分配给 userIdDetailViewEvents 的 SimpleFunction
。
Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
Duration.standardSeconds(1)
)
)
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
它打印:
userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================
使用 AfterWatermark
Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
)
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
产出
userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01
我认为使用 AfterWatermark 是正确的,但它挂在某个地方...我想是 JdbcIO
GlobalWindow
永远不会关闭,因此它不适合像 pubsub 这样的无限数据源。
我建议改用 FixedWindow(<Some time range>)
。
您可以在此处阅读有关 windows 的更多信息 https://beam.apache.org/documentation/programming-guide/#windowing
你是对的,标准配置不会return产生无限集合。
只要你有一个功能性的window/触发器组合,诀窍就是设置
.withOutputParallelization(false);
到 JdbcIO.<>readAll()
调用如下:
p.apply("Read from JDBC", JdbcIO.<>readAll()
.withDataSourceConfiguration(getConfiguration())
.withQuery(getStreamingQuery())
.withParameterSetter((JdbcIO.PreparedStatementSetter<String>) (element, preparedStatement) -> {
// Prepare statement here.
})
.withRowMapper((JdbcIO.RowMapper<>) results -> {
// Map results here.
}).withOutputParallelization(false));
我为此苦苦挣扎了几个小时,但找到了代码 in this article,它终于成功了。
我用过
Window.into(new GlobalWindows()).triggering(
Repeatedly.forever(AfterPane.elementCountAtLeast(1))
)
.discardingFiredPanes())
让我的触发器一次处理一个元素。
我正在尝试使用 Apache Beam 为事件构建管道。我想做的是从 GCP PubSub 读取流数据并使用事件中的 ID 从 MySQL 读取相关元数据,然后将这两个流合并并写入我的 clickhouse 数据库。
但是 JdbcIO.readall() 似乎没有 return 它的流。正如您在 ClickhousePipeline
上看到的那样,在应用 CoGroupByKey.create()
之后,我试图合并两个 PCollection
,但是 userMetaData
是空的,而 ParDo
是链接的在 UserMetadataEnricher()
还没有执行之后。
在 UserMetadataEnricher
上的 withRowMapper
中,我添加了 println()
来检查它是否为 运行,并且它工作正常并从我的数据库中打印结果,但是,它不会 return 数据到下一个管道。
我猜问题与 Windowing
有关,我在没有 windowing 的情况下测试它时检查它是否正常工作。但是,PubSubIO 是 Unbounded PCollection
,所以我必须申请 window 才能使用 JDBCIO.readall()
对吗?我不知道解决这个问题。希望尽快得到答复!
主管道
object MainPipeline {
@JvmStatic
fun run(options: MainPipelineOptions) {
val p = Pipeline.create(options)
val events = p
.apply(
"Read DetailViewEvent PubSub",
PubsubIO.readStrings().fromSubscription(options.inputSubscription)
)
.apply(
"Extract messages",
ParseJsons.of(FoodDetailViewEvent::class.java)
.exceptionsInto(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())
)
.exceptionsVia { KV.of(it.element(), it.exception().javaClass.canonicalName) }
)
val validEvents =
events.output().setCoder(SerializableCoder.of(FoodDetailViewEvent::class.java))
val invalidEvents = events.failures()
invalidEvents.apply(FailurePipeline(options))
validEvents.apply(ClickhousePipeline(options))
p.run().waitUntilFinish()
}
@JvmStatic
fun main(args: Array<String>) {
val options = PipelineOptionsFactory
.fromArgs(*args)
.withValidation()
.`as`(MainPipelineOptions::class.java)
run(options)
}
}
Clickhouse管道
class ClickhousePipeline(private val options: MainPipelineOptions) :
PTransform<PCollection<DetailViewEvent>, PDone>() {
override fun expand(events: PCollection<DetailViewEvent>): PDone {
val windowedEvents = events
.apply(
"Window", Window
.into<DetailViewEvent>(GlobalWindows())
.triggering(
Repeatedly
.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
val userIdDetailViewEvents = windowedEvents
.apply(
MapElements.via(object :
SimpleFunction<DetailViewEvent, KV<String, DetailViewEvent>>() {
override fun apply(input: DetailViewEvent): KV<String, DetailViewEvent> {
return KV.of(input.userInfo.userId, input)
}
})
)
val userMetaData = userIdDetailViewEvents
.apply(
MapElements.via(object :
SimpleFunction<KV<String, DetailViewEvent>, String>() {
override fun apply(input: KV<String, DetailViewEvent>): String {
return input.key!!
}
})
)
.apply(
UserMetadataEnricher(options)
)
.apply(
ParDo.of(
object : DoFn<UserMetadata, KV<String, UserMetadata>>() {
@ProcessElement
fun processElement(
@Element data: UserMetadata,
out: OutputReceiver<KV<String, UserMetadata>>
) {
println("User:: ${data}") // Not printed!!
out.output(KV.of(data.userId, data))
}
})
)
val sourceTag = object : TupleTag<DetailViewEvent>() {}
val userMetadataTag = object : TupleTag<UserMetadata>() {}
val joinedPipeline: PCollection<KV<String, CoGbkResult>> =
KeyedPCollectionTuple.of(sourceTag, userIdDetailViewEvents)
.and(userMetadataTag, userMetaData)
.apply(CoGroupByKey.create())
val enrichedData = joinedPipeline.apply(
ParDo.of(object : DoFn<KV<String, CoGbkResult>, ClickHouseModel>() {
@ProcessElement
fun processElement(
@Element data: KV<String, CoGbkResult>,
out: OutputReceiver<ClickHouseModel>
) {
val name = data.key
val source = data.value.getAll(sourceTag)
val userMetadataSource = data.value.getAll(userMetadataTag)
println("==========================")
for (metadata in userMetadataSource.iterator()) {
println("Metadata:: $metadata") // This is always empty
}
for (event in source.iterator()) {
println("Event:: $event")
}
println("==========================")
val sourceEvent = source.iterator().next()
if (userMetadataSource.iterator().hasNext()) {
val userMetadataEvent = userMetadataSource.iterator().next()
out.output(
ClickHouseModel(
eventType = sourceEvent.eventType,
userMetadata = userMetadataEvent
)
)
}
}
})
)
val clickhouseData = enrichedData.apply(
ParDo.of(object : DoFn<ClickHouseModel, Row>() {
@ProcessElement
fun processElement(context: ProcessContext) {
val data = context.element()
context.output(
data.toSchema()
)
}
})
)
return clickhouseData
.setRowSchema(ClickHouseModel.schemaType())
.apply(
ClickHouseIO.write(
"jdbc:clickhouse://127.0.0.1:8123/test?password=example",
"clickhouse_test"
)
)
}
}
UserMetadataEnricher
class UserMetadataEnricher(private val options: MainPipelineOptions) :
PTransform<PCollection<String>, PCollection<UserMetadata>>() {
@Throws(Exception::class)
override fun expand(events: PCollection<String>): PCollection<UserMetadata> {
return events
.apply(
JdbcIO.readAll<String, UserMetadata>()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/beam-test"
)
.withUsername("root")
.withPassword("example")
)
.withQuery("select id,name,gender from user where id = ?")
.withParameterSetter { id: String, preparedStatement: PreparedStatement ->
preparedStatement.setString(1, id)
}
.withCoder(
SerializableCoder.of(
UserMetadata::class.java
)
)
.withRowMapper
{
println("RowMapper:: ${it.getString(1)}") // printed!!
UserMetadata(
it.getString(1),
it.getString(2),
it.getString(3)
)
}
)
}
}
输出
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================
更新 1(全局窗口到固定窗口)
使用后处理
我已经更改了我的 window 设置并将打印添加到分配给 userIdDetailViewEvents 的 SimpleFunction
。
Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
Duration.standardSeconds(1)
)
)
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
它打印:
userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================
使用 AfterWatermark
Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
)
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
产出
userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01
我认为使用 AfterWatermark 是正确的,但它挂在某个地方...我想是 JdbcIO
GlobalWindow
永远不会关闭,因此它不适合像 pubsub 这样的无限数据源。
我建议改用 FixedWindow(<Some time range>)
。
您可以在此处阅读有关 windows 的更多信息 https://beam.apache.org/documentation/programming-guide/#windowing
你是对的,标准配置不会return产生无限集合。
只要你有一个功能性的window/触发器组合,诀窍就是设置
.withOutputParallelization(false);
到 JdbcIO.<>readAll()
调用如下:
p.apply("Read from JDBC", JdbcIO.<>readAll()
.withDataSourceConfiguration(getConfiguration())
.withQuery(getStreamingQuery())
.withParameterSetter((JdbcIO.PreparedStatementSetter<String>) (element, preparedStatement) -> {
// Prepare statement here.
})
.withRowMapper((JdbcIO.RowMapper<>) results -> {
// Map results here.
}).withOutputParallelization(false));
我为此苦苦挣扎了几个小时,但找到了代码 in this article,它终于成功了。
我用过
Window.into(new GlobalWindows()).triggering(
Repeatedly.forever(AfterPane.elementCountAtLeast(1))
)
.discardingFiredPanes())
让我的触发器一次处理一个元素。