如何在 kafka 连接中使用转换将字符串转换为时间戳,并使用 jdbc 汇流连接器插入到 postgres 中?
How to convert String to Timestamp in kafka connect using transforms and insert into postgres using jdbc sink connector from confluent?
下面是我的kafka-connect-sink.properties文件
我正在使用 confluent-6.0.1.
name=enba-sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://IP:PORT/DB
connection.user=USERNAME
connection.password=PASSWORD
tasks.max=1
topics=postgresInsert
insert.mode=INSERT
table.name.format=schema."tableName"
auto.create=false
key.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=false
value.converter.schemas.enable=false
config.action.reload=restart
value.converter.schema.registry.url=http://localhost:8081
errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
print.key=true
# Transforms
transforms=TimestampConverter
transforms.TimestampConverter.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampConverter.format=yyyy-MM-dd HH:mm:ss
transforms.TimestampConverter.target.type=Timestamp
transforms.TimestampConverter.target.field=DATE_TIME
我正在使用 avro 数据和模式是:
{\"type\":\"record\",\"name\":\"log\",\"namespace\":\"transform.name.space\",\"fields\":[{\"name\":\"TRANSACTION_ID\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"MSISDN\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"TRIGGER_NAME\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"W_ID\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"STEP\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"REWARD_ID\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"CAM_ID\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"STATUS\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"COMMENTS\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"CCR_JSON\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"DATE_TIME\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"}]}
Postgres 中的 DATE_TIME 列基本上是 Timestamp 类型,我尝试从 avro 发送 String date 和 long 类型。
DATE_TIME = 2022-12-15 14:38:02
问题是,如果我不使用转换,则会出现错误:
ERROR: column "DATE_TIME" is of type timestamp with time zone but expression is of type character varying
如果我使用上面提到的转换,那么错误是:
[2021-02-06 21:47:41,897] ERROR Error encountered in task enba-sink-postgres-0. Executing stage 'TRANSFORMATION' with class 'org.apache.kafka.connect.transforms.TimestampConverter$Value', where consumed record is {topic='enba', partition=0, offset=69, timestamp=1612628261605, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
org.apache.kafka.connect.errors.ConnectException: Schema Schema{com.package.kafkaconnect.Enbalog:STRUCT} does not correspond to a known timestamp type format
我使用了它:
# Transforms
transforms= timestamp
transforms.timestamp.type= org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.timestamp.target.type= Timestamp
transforms.timestamp.field= DATE_TIME
transforms.timestamp.format= yyyy-MM-dd HH:mm:ss
出于某种原因,transforms=TimestampConverter 无法正常工作。
下面是我的kafka-connect-sink.properties文件 我正在使用 confluent-6.0.1.
name=enba-sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://IP:PORT/DB
connection.user=USERNAME
connection.password=PASSWORD
tasks.max=1
topics=postgresInsert
insert.mode=INSERT
table.name.format=schema."tableName"
auto.create=false
key.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=false
value.converter.schemas.enable=false
config.action.reload=restart
value.converter.schema.registry.url=http://localhost:8081
errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
print.key=true
# Transforms
transforms=TimestampConverter
transforms.TimestampConverter.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampConverter.format=yyyy-MM-dd HH:mm:ss
transforms.TimestampConverter.target.type=Timestamp
transforms.TimestampConverter.target.field=DATE_TIME
我正在使用 avro 数据和模式是:
{\"type\":\"record\",\"name\":\"log\",\"namespace\":\"transform.name.space\",\"fields\":[{\"name\":\"TRANSACTION_ID\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"MSISDN\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"TRIGGER_NAME\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"W_ID\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"STEP\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"REWARD_ID\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"CAM_ID\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"STATUS\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"COMMENTS\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"CCR_JSON\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"},{\"name\":\"DATE_TIME\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"avro.java.string\":\"String\"}]}
Postgres 中的 DATE_TIME 列基本上是 Timestamp 类型,我尝试从 avro 发送 String date 和 long 类型。 DATE_TIME = 2022-12-15 14:38:02
问题是,如果我不使用转换,则会出现错误:
ERROR: column "DATE_TIME" is of type timestamp with time zone but expression is of type character varying
如果我使用上面提到的转换,那么错误是:
[2021-02-06 21:47:41,897] ERROR Error encountered in task enba-sink-postgres-0. Executing stage 'TRANSFORMATION' with class 'org.apache.kafka.connect.transforms.TimestampConverter$Value', where consumed record is {topic='enba', partition=0, offset=69, timestamp=1612628261605, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
org.apache.kafka.connect.errors.ConnectException: Schema Schema{com.package.kafkaconnect.Enbalog:STRUCT} does not correspond to a known timestamp type format
我使用了它:
# Transforms
transforms= timestamp
transforms.timestamp.type= org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.timestamp.target.type= Timestamp
transforms.timestamp.field= DATE_TIME
transforms.timestamp.format= yyyy-MM-dd HH:mm:ss
出于某种原因,transforms=TimestampConverter 无法正常工作。