如何在 Apache Beam 中将插入 JSON 数组流式传输到 BigQuery table

How to stream insert JSON array to BigQuery table in Apache Beam

我的 apache beam 应用程序在 JSON 数组中收到一条消息,但将每一行插入到 BigQuery table。我如何在 ApacheBeam 中支持这个用例?我可以拆分每一行并将其逐一插入到 table 吗?

JSON 消息示例:

[
  {"id": 1, "name": "post1", "price": 10},
  {"id": 2, "name": "post2", "price": 20},
  {"id": 3, "name": "post3", "price": 30}
]

BigQuery table 架构:

[
    {
      "mode": "REQUIRED",
      "name": "id",
      "type": "INT64"
    },
    {
      "mode": "REQUIRED",
      "name": "name",
      "type": "STRING"
    },
    {
      "mode": "REQUIRED",
      "name": "price",
      "type": "INT64"
    }
]

这是我的解决方案。我将 JSON 字符串转换为 List 一次,然后 c.output 一个一个地转换。我的代码在 Scala 中,但你可以在 Java.

中做同样的事情
    case class MyTranscationRecord(id: String, name: String, price: Int)
    case class MyTranscation(recordList: List[MyTranscationRecord])
    class ConvertJSONTextToMyRecord extends DoFn[KafkaRecord[java.lang.Long, String], MyTranscation]() {
      private val logger: Logger = LoggerFactory.getLogger(classOf[ConvertJSONTextToMyRecord])
      @ProcessElement
      def processElement(c: ProcessContext): Unit = {
        try {
          val mapper: ObjectMapper = new ObjectMapper()
            .registerModule(DefaultScalaModule)
          val messageText = c.element.getKV.getValue
          val transaction: MyRecord = mapper.readValue(messageText, classOf[MyTranscation])
          logger.info(s"successfully converted to an EPC transaction = $transaction")
          for (record <- transaction.recordList) {
              c.output(record)
          }
        } catch {
          case e: Exception =>
            val message = e.getLocalizedMessage + e.getStackTrace
            logger.error(message)
        }
      }
    }