梁JdbcIO.readAll好像没有return结果

Beam JdbcIO.readAll doesn't seem to return results

我正在尝试使用 Apache Beam 为事件构建管道。我想做的是从 GCP PubSub 读取流数据并使用事件中的 ID 从 MySQL 读取相关元数据,然后将这两个流合并并写入我的 clickhouse 数据库。

但是 JdbcIO.readall() 似乎没有 return 它的流。正如您在 ClickhousePipeline 上看到的那样,在应用 CoGroupByKey.create() 之后,我试图合并两个 PCollection,但是 userMetaData 是空的,而 ParDo 是链接的在 UserMetadataEnricher() 还没有执行之后。

UserMetadataEnricher 上的 withRowMapper 中,我添加了 println() 来检查它是否为 运行,并且它工作正常并从我的数据库中打印结果,但是,它不会 return 数据到下一个管道。

我猜问题与 Windowing 有关,我在没有 windowing 的情况下测试它时检查它是否正常工作。但是,PubSubIO 是 Unbounded PCollection,所以我必须申请 window 才能使用 JDBCIO.readall() 对吗?我不知道解决这个问题。希望尽快得到答复!

主管道

object MainPipeline {
  @JvmStatic
  fun run(options: MainPipelineOptions) {
    val p = Pipeline.create(options)

    val events = p
      .apply(
        "Read DetailViewEvent PubSub",
        PubsubIO.readStrings().fromSubscription(options.inputSubscription)
      )
      .apply(
        "Extract messages",
        ParseJsons.of(FoodDetailViewEvent::class.java)
          .exceptionsInto(
            TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())
          )
          .exceptionsVia { KV.of(it.element(), it.exception().javaClass.canonicalName) }
      )

    val validEvents =
      events.output().setCoder(SerializableCoder.of(FoodDetailViewEvent::class.java))
    val invalidEvents = events.failures()

    invalidEvents.apply(FailurePipeline(options))
    validEvents.apply(ClickhousePipeline(options))

    p.run().waitUntilFinish()
  }

  @JvmStatic
  fun main(args: Array<String>) {
    val options = PipelineOptionsFactory
      .fromArgs(*args)
      .withValidation()
      .`as`(MainPipelineOptions::class.java)

    run(options)
  }
}

Clickhouse管道

class ClickhousePipeline(private val options: MainPipelineOptions) :
  PTransform<PCollection<DetailViewEvent>, PDone>() {

  override fun expand(events: PCollection<DetailViewEvent>): PDone {
    val windowedEvents = events
      .apply(
        "Window", Window
          .into<DetailViewEvent>(GlobalWindows())
          .triggering(
            Repeatedly
              .forever(
                AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardSeconds(5))
              )
          )
          .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
      )

    val userIdDetailViewEvents = windowedEvents
      .apply(
        MapElements.via(object :
          SimpleFunction<DetailViewEvent, KV<String, DetailViewEvent>>() {
          override fun apply(input: DetailViewEvent): KV<String, DetailViewEvent> {
            return KV.of(input.userInfo.userId, input)
          }
        })
      )

    val userMetaData = userIdDetailViewEvents
      .apply(
        MapElements.via(object :
          SimpleFunction<KV<String, DetailViewEvent>, String>() {
          override fun apply(input: KV<String, DetailViewEvent>): String {
            return input.key!!
          }
        })
      )
      .apply(
        UserMetadataEnricher(options)
      )
      .apply(
        ParDo.of(
          object : DoFn<UserMetadata, KV<String, UserMetadata>>() {
            @ProcessElement
            fun processElement(
              @Element data: UserMetadata,
              out: OutputReceiver<KV<String, UserMetadata>>
            ) {
              println("User:: ${data}") // Not printed!!
              out.output(KV.of(data.userId, data))
            }
          })
      )

    val sourceTag = object : TupleTag<DetailViewEvent>() {}
    val userMetadataTag = object : TupleTag<UserMetadata>() {}

    val joinedPipeline: PCollection<KV<String, CoGbkResult>> =
      KeyedPCollectionTuple.of(sourceTag, userIdDetailViewEvents)
        .and(userMetadataTag, userMetaData)
        .apply(CoGroupByKey.create())

    val enrichedData = joinedPipeline.apply(
      ParDo.of(object : DoFn<KV<String, CoGbkResult>, ClickHouseModel>() {
        @ProcessElement
        fun processElement(
          @Element data: KV<String, CoGbkResult>,
          out: OutputReceiver<ClickHouseModel>
        ) {

          val name = data.key
          val source = data.value.getAll(sourceTag)
          val userMetadataSource = data.value.getAll(userMetadataTag)

          println("==========================")
          for (metadata in userMetadataSource.iterator()) {
            println("Metadata:: $metadata") // This is always empty
          }

          for (event in source.iterator()) {
            println("Event:: $event")
          }
          println("==========================")

          val sourceEvent = source.iterator().next()
          if (userMetadataSource.iterator().hasNext()) {
            val userMetadataEvent = userMetadataSource.iterator().next()
            out.output(
              ClickHouseModel(
                eventType = sourceEvent.eventType,
                userMetadata = userMetadataEvent
              )
            )

          }
        }
      })
    )

    val clickhouseData = enrichedData.apply(
      ParDo.of(object : DoFn<ClickHouseModel, Row>() {
        @ProcessElement
        fun processElement(context: ProcessContext) {
          val data = context.element()
          context.output(
            data.toSchema()
          )
        }
      })
    )

    return clickhouseData
      .setRowSchema(ClickHouseModel.schemaType())
      .apply(
        ClickHouseIO.write(
          "jdbc:clickhouse://127.0.0.1:8123/test?password=example",
          "clickhouse_test"
        )
      )
  }
}

UserMetadataEnricher

class UserMetadataEnricher(private val options: MainPipelineOptions) :
  PTransform<PCollection<String>, PCollection<UserMetadata>>() {

  @Throws(Exception::class)
  override fun expand(events: PCollection<String>): PCollection<UserMetadata> {
    return events
      .apply(
        JdbcIO.readAll<String, UserMetadata>()
          .withDataSourceConfiguration(
            JdbcIO.DataSourceConfiguration.create(
              "com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/beam-test"
            )
              .withUsername("root")
              .withPassword("example")
          )
          .withQuery("select id,name,gender from user where id = ?")
          .withParameterSetter { id: String, preparedStatement: PreparedStatement ->
            preparedStatement.setString(1, id)
          }
          .withCoder(
            SerializableCoder.of(
              UserMetadata::class.java
            )
          )
          .withRowMapper
          {
            println("RowMapper:: ${it.getString(1)}") // printed!!
            UserMetadata(
              it.getString(1),
              it.getString(2),
              it.getString(3)
            )
          }
      )
  }
}


输出

RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================

更新 1(全局窗口到固定窗口)

使用后处理

我已经更改了我的 window 设置并将打印添加到分配给 userIdDetailViewEvents 的 SimpleFunction

Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
          .triggering(
            Repeatedly.forever(
              AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                Duration.standardSeconds(1)
              )
            )
          )
          .withAllowedLateness(Duration.ZERO)
          .discardingFiredPanes()
      )

它打印:

userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================

使用 AfterWatermark

        Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
          .triggering(
            Repeatedly.forever(
              AfterWatermark.pastEndOfWindow()
            )
          )
          .withAllowedLateness(Duration.ZERO)
          .discardingFiredPanes()

产出

userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01

我认为使用 AfterWatermark 是正确的,但它挂在某个地方...我想是 JdbcIO

GlobalWindow 永远不会关闭,因此它不适合像 pubsub 这样的无限数据源。

我建议改用 FixedWindow(<Some time range>)。 您可以在此处阅读有关 windows 的更多信息 https://beam.apache.org/documentation/programming-guide/#windowing

你是对的,标准配置不会return产生无限集合。

只要你有一个功能性的window/触发器组合,诀窍就是设置 .withOutputParallelization(false);JdbcIO.<>readAll() 调用如下:

p.apply("Read from JDBC", JdbcIO.<>readAll()
              .withDataSourceConfiguration(getConfiguration())
              .withQuery(getStreamingQuery())
              .withParameterSetter((JdbcIO.PreparedStatementSetter<String>) (element, preparedStatement) -> {
                // Prepare statement here.
              })
              .withRowMapper((JdbcIO.RowMapper<>) results -> {
                // Map results here. 
              }).withOutputParallelization(false));

我为此苦苦挣扎了几个小时,但找到了代码 in this article,它终于成功了。

我用过

Window.into(new GlobalWindows()).triggering(
                        Repeatedly.forever(AfterPane.elementCountAtLeast(1))
                                )
                                 .discardingFiredPanes())

让我的触发器一次处理一个元素。