如何使用 Kiba-ETL 转换嵌套的 JSON-payloads?

How to transform nested JSON-payloads with Kiba-ETL?

我想使用 Kiba-ETL 将嵌套的 JSON-有效负载转换为关系表。这是一个简化的伪JSON-payload:

{
  "bookings": [
    {
      "bookingNumber": "1111",
      "name": "Booking 1111",
      "services": [
        {
          "serviceNumber": "45",
          "serviceName": "Extra Service"
        }
      ]
    },
    {
      "bookingNumber": "2222",
      "name": "Booking 2222",
      "services": [
        {
          "serviceNumber": "1",
          "serviceName": "Super Service"
        },
        {
          "serviceNumber": "2",
          "serviceName": "Bonus Service"
        }
      ]
    }
  ]
}

如何将此有效负载转换为两个表:

我在 wiki、博客等 Kiba::Common::Transforms::EnumerableExploder 的帮助下阅读了关于生成多行的文章

你会通过生成多行(预订和多项服务)来解决我的用例,还是会实施一个 Destination 来接收整个预订并调用一些子目的地(即创建或更新服务)?

这里是 Kiba 的作者!

这是一个常见的要求,但它可能(这不是 Kiba 特有的)处理起来或多或少有些复杂。以下是您需要考虑的几点。

外键的处理

这里的主要问题是,一旦插入,您将希望保留服务和预订之间的关系。

外键使用业务键

处​​理此问题的第一个(最简单的)方法是在 "booking number" 上使用 foreign-key 约束,并确保在每个服务行中插入该预订编号,以便您可以利用稍后在您的查询中。如果您这样做(请参阅 ),您必须在预订 table 目标中的 "booking number" 上设置 unique-constraint。

使用主键的外键

如果您更喜欢 booking_id 指向 bookings table id 键,事情会有点复杂。

如果这是针对空 table 的 one-off 导入,我建议您使用类似以下内容任意强制主键:

transform do |r|
  @row_index ||= 0
  @row_index += 1
  r.merge(id: @row_index)
end

如果这不是 one-off 导入,您将必须: * 在第一次通过时更新预订 * 在第二遍中,look-up(通过 SQL 查询)"bookings" 找出要存储在 booking_id 中的 id 是什么,然后更新服务

如您所见,它需要做更多的工作,所以如果您对此没有强烈要求,请坚持使用选项 1(尽管选项 2 在长 运行 上更可靠)。

实施示例(使用 Kiba Pro 和业务密钥)

实现此目的的最简单方法(假设您的目标是 Postgres)是使用 Kiba Pro 的 SQL Bulk Insert/Upsert destination

它会这样走(单程):

extend Kiba::DSLExtensions::Config
config :kiba, runner: Kiba::StreamingRunner

source Kiba::Common::Sources::Enumerable, -> { Dir["input/*.json"] }

transform { |r| JSON.parse(IO.read(r)).fetch('bookings') }

transform Kiba::Common::Transforms::EnumerableExploder

# SNIP (remapping / renaming of fields etc)

first_destination = nil

destination Kiba::Pro::Destinations::SQLBulkInsert,
  row_pre_processor: -> (row) { row.except("services") },
  dataset: -> (dataset) {
    dataset.insert_conflict(target: :booking_number)
  },
  after_read: -> (d) { first_destination = d }

destination Kiba::Pro::Destinations::SQLBulkInsert,
  row_pre_processor: -> (row) { row.fetch("services") },
  dataset: -> (dataset) {
    dataset.insert_conflict(target: :service_number)
  },
  before_flush: -> { first_destination.flush }

这里我们遍历每个输入文件,解析它并获取 "bookings",然后为 "bookings" 的每个元素生成一行。

我们有 2 个目的地,做 "upsert"(插入或更新),加上一个技巧来确保我们在插入子行之前保存父行,以避免由于缺少指向的记录而导致失败。

您当然可以自己实现,但这需要一些工作!

如果您需要使用基于 primary-key 的外键,您将(可能)分成两遍(每个目的地一个),然后在中间添加某种形式的查找。

结论

我知道这不是微不足道的(取决于你需要什么,以及你是否使用 Kiba Pro),但至少我分享了我在这种情况下使用的模式.

希望对您有所帮助!