Kafka JDBC sink 不处理空值

Kafka JDBC sink no handling null values

我正在尝试使用 Kafka JDBC 接收器连接器插入数据,但它返回给我这个异常。

org.apache.kafka.connect.errors.DataException: Invalid null value for required INT64 field

记录具有以下架构:

[
  {
    "schema": {
      "type": "struct",
      "fields": [
        {
          "type": "int64",
          "field": "ID"
        },
        {
          "type": "int64",
          "field": "TENANT_ID"
        },
        {
          "type": "string",
          "field": "ITEM"
        },
        {
          "type": "int64",
          "field": "TIPO"
        },
        {
          "type": "int64",
          "field": "BUSINESS_CONCEPT"
        },
        {
          "type": "string",
          "field": "ETIQUETA"
        },
        {
          "type": "string",
          "field": "VALOR"
        },
        {
          "type": "string",
          "field": "GG_T_TYPE"
        },
        {
          "type": "string",
          "field": "GG_T_TIMESTAMP"
        },
        {
          "type": "string",
          "field": "TD_T_TIMESTAMP"
        },
        {
          "type": "string",
          "field": "POS"
        }
      ]
    },
    "payload": {
      "ID": 298457,
      "TENANT_ID": 83,
      "ITEM": "0-0-0",
      "TIPO": 4,
      "BUSINESS_CONCEPT": null,
      "ETIQUETA": "¿Cuándo ha ocurrido?",
      "VALOR": "2019-05-31T10:33:00Z",
      "GG_T_TYPE": "I",
      "GG_T_TIMESTAMP": "2019-05-31 14:35:19.002221",
      "TD_T_TIMESTAMP": "2019-06-05T10:46:55.0106",
      "POS": "00000000530096832544"
    }
  }
]

如您所见,值 BUSINESS_CONCEPT 可以是 null。它是唯一的 null 值,所以我想异常是由于该字段引起的。我怎样才能让接收器插入值 null?

您需要更改

的定义
{
  "type": "int64",
  "field": "BUSINESS_CONCEPT"
}

{
  "type": ["null", "int64"],
  "field": "BUSINESS_CONCEPT"
}

以便将 BUSINESS_CONCEPT 视为可选字段。