如何在kafka接收器连接器中添加带有kafka消息时间戳的列
How to add column with the kafka message timestamp in kafka sink connector
我正在使用 properties/json 文件配置我的连接器,当它从源连接器读取消息但没有成功时,我试图添加一个包含 kafka 时间戳的时间戳列。
我尝试添加 transforms
,但它始终为空,而我的接收器连接器 "big query" 它 return 我出错了
Failed to update table schema
我确实将这些配置放在了 bigquery 连接器属性中
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
我的源 Config Sap 连接器
{
"name": "sap",
"config": {
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"tasks.max": "10",
"topics": "mytopic",
"connection.url": "jdbc:sap://IP:30015/",
"connection.user": "user",
"connection.password": "pass",
"group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
"mytopic.table.name": "\"schema\".\"mytable\""
}
}
我的接收器连接器 BigQuery
name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
sanitizeTopics=true
autoCreateTables=true
autoUpdateSchemas=true
schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081
bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000
project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
我猜你的错误来自 BigQuery,而不是 Kafka Connect。
例如,以独立模式启动 Connect Console Consumer,您会看到类似
的消息
Struct{...,fieldtime=Fri Nov 16 07:38:19 UTC 2018}
测试 connect-standalone ./connect-standalone.properties ./connect-console-sink.properties
我有一个包含 Avro 数据的输入主题...相应地更新您自己的设置
连接-standalone.properties
bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
key.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
connect-console-sink.properties
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=input-topic
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
旧答案
我想我明白了背后的问题
首先,您不能在任何源连接器中使用 InsertField 转换,因为消息的时间戳值是在写入主题时分配的,因此连接器无法知道它,
对于 JDBC 连接器,有这张票
https://github.com/confluentinc/kafka-connect-jdbc/issues/311
并且在 sap 源连接器中也无法正常工作。
第二个 BigQuery 连接器有一个错误,不允许使用 InsertField 向每个 table 添加时间戳,如此处
https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994
因此,如果您想使用 bigquery 作为输出,目前唯一的解决方案是手动编辑每个 table 的模式以在加载 cink 连接器之前添加列
更新 2018-12-03
始终在 SINK 连接器中添加消息时间戳的最终解决方案。假设您要将时间戳添加到接收器连接器
的每个 table
在您的 SOURCE CONNECTOR 中放置此配置
"transforms":"InsertField"
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
这将为每个源 tables
添加一个名为 "fieldtime" 的列名
在您的 SINK CONNECTOR 中放置这些配置
"transforms":"InsertField,DropField",
"transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.DropField.blacklist":"fieldtime",
"transforms.InsertSource.timestamp.field":"kafka_timestamp",
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
这实际上会删除列 fieldtime 并使用消息的时间戳再次添加它
此解决方案将自动添加具有正确值的列,无需任何添加操作
我正在使用 properties/json 文件配置我的连接器,当它从源连接器读取消息但没有成功时,我试图添加一个包含 kafka 时间戳的时间戳列。
我尝试添加 transforms
,但它始终为空,而我的接收器连接器 "big query" 它 return 我出错了
Failed to update table schema
我确实将这些配置放在了 bigquery 连接器属性中
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
我的源 Config Sap 连接器
{
"name": "sap",
"config": {
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"tasks.max": "10",
"topics": "mytopic",
"connection.url": "jdbc:sap://IP:30015/",
"connection.user": "user",
"connection.password": "pass",
"group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
"mytopic.table.name": "\"schema\".\"mytable\""
}
}
我的接收器连接器 BigQuery
name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
sanitizeTopics=true
autoCreateTables=true
autoUpdateSchemas=true
schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081
bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000
project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
我猜你的错误来自 BigQuery,而不是 Kafka Connect。
例如,以独立模式启动 Connect Console Consumer,您会看到类似
的消息Struct{...,fieldtime=Fri Nov 16 07:38:19 UTC 2018}
测试 connect-standalone ./connect-standalone.properties ./connect-console-sink.properties
我有一个包含 Avro 数据的输入主题...相应地更新您自己的设置
连接-standalone.properties
bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
key.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
connect-console-sink.properties
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=input-topic
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
旧答案 我想我明白了背后的问题
首先,您不能在任何源连接器中使用 InsertField 转换,因为消息的时间戳值是在写入主题时分配的,因此连接器无法知道它,
对于 JDBC 连接器,有这张票
https://github.com/confluentinc/kafka-connect-jdbc/issues/311
并且在 sap 源连接器中也无法正常工作。
第二个 BigQuery 连接器有一个错误,不允许使用 InsertField 向每个 table 添加时间戳,如此处
https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994
因此,如果您想使用 bigquery 作为输出,目前唯一的解决方案是手动编辑每个 table 的模式以在加载 cink 连接器之前添加列
更新 2018-12-03 始终在 SINK 连接器中添加消息时间戳的最终解决方案。假设您要将时间戳添加到接收器连接器
的每个 table在您的 SOURCE CONNECTOR 中放置此配置
"transforms":"InsertField"
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
这将为每个源 tables
添加一个名为 "fieldtime" 的列名在您的 SINK CONNECTOR 中放置这些配置
"transforms":"InsertField,DropField",
"transforms.DropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.DropField.blacklist":"fieldtime",
"transforms.InsertSource.timestamp.field":"kafka_timestamp",
"transforms.InsertField.timestamp.field":"fieldtime",
"transforms.InsertField.type":"org.apache.kafka.connect.transforms.InsertField$Value"
这实际上会删除列 fieldtime 并使用消息的时间戳再次添加它
此解决方案将自动添加具有正确值的列,无需任何添加操作