将消息从 Kafka 转发到 Elasticsearch 和 Postgresql
Forwarding messages from Kafka to Elasticsearch and Postgresql
我正在尝试构建一个基础架构,我需要在其中将消息从一个 kafka 主题转发到 elasticsearch 和 postgresql。我的基础架构如下图所示,它们都在同一台主机上运行。 Logstash 正在进行一些匿名化和一些变异,并将文档作为 json 发送回 kafka。然后 Kafka 应该将消息转发给 PostgreSQL 和 Elasticsearch
一切正常,接受与 postgresql 的连接,我遇到了一些问题。
我的配置文件如下所示:
sink-quickstart-sqlite.properties
name=jdbc-test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#table.name.format=${topic}
topics=processed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable:true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable:true
connection.url=jdbc:postgresql://localhost:5432/postgres
connection.user=postgres
connection.password=luka
insert.mode=upsert
pk.mode=kafka
pk_fields=__connect_topic,__connect_partition,__connect_offset
fields.whitelist=ident,auth,response,request,clientip
auto.create=true
auto.evolve=true
confluent-distributed.properties
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
quicstart-elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
#topics=test-elasticsearch-sink
topics=processed
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
schema.ignore=true
schemas.enable=false
confluent-schema-registry 服务是 运行。
我在 curl http://localhost:8083/connectors/jdbc-sink/status 后收到以下错误 | jq
{
"name": "jdbc-sink",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.50.37:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "192.168.50.37:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
"
}
],
"type": "sink"
}
这看起来像我的 "processed" 主题中的一条消息(主题中的消息是 one-liner,这是刚刚格式化的):
{
"ROWTIME": 1587134287569,
"ROWKEY": "null",
"bytes": "4050",
"input": {
"type": "log"
},
"clientip": "156.226.170.95",
"@timestamp": "2020-04-17T14:38:06.346Z",
"timestamp": "17/Apr/2020:14:37:57 +0000",
"@version": "1",
"request": "/lists",
"ident": "536e605f097a92cb07c2a0a81f809f481c5af00d13305f0094057907792ce65e2a62b8ab8ba036f95a840504c3e2f05a",
"httpversion": "1.1",
"auth": "33a7f4a829adfaa60085eca1b84e0ae8f0aa2835d206ac765c22ad440e50d7ae462adafb13306aecfdcd6bd80b52b03f",
"agent": {
"ephemeral_id": "053b9c29-9038-4a89-a2b3-a5d8362460fe",
"version": "7.6.2",
"id": "50e21169-5aa0-496f-b792-3936e9c8de04",
"hostname": "HOSTNAME_OF_MY_AWS_INSTANCE",
"type": "filebeat"
},
"log": {
"offset": 707943,
"file": {
"path": "/home/ec2-user/log/apache.log"
}
},
"host": {
"name": "HOSTNAME_OF_MY_AWS_INSTANCE"
},
"verb": "DELETE",
"ecs": {
"version": "1.4.0"
},
"response": "503"
}
如果您需要更多信息,请告诉我。
你的错误在这里:
DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
由于这是 JDBC 接收器,您 必须 为您的数据提供架构。如果你有选择,我建议你使用 Avro。如果没有,您 必须 按照 Kafka Connect 的要求构建您的 JSON 数据。
我正在尝试构建一个基础架构,我需要在其中将消息从一个 kafka 主题转发到 elasticsearch 和 postgresql。我的基础架构如下图所示,它们都在同一台主机上运行。 Logstash 正在进行一些匿名化和一些变异,并将文档作为 json 发送回 kafka。然后 Kafka 应该将消息转发给 PostgreSQL 和 Elasticsearch
一切正常,接受与 postgresql 的连接,我遇到了一些问题。
我的配置文件如下所示:
sink-quickstart-sqlite.properties
name=jdbc-test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#table.name.format=${topic}
topics=processed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable:true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable:true
connection.url=jdbc:postgresql://localhost:5432/postgres
connection.user=postgres
connection.password=luka
insert.mode=upsert
pk.mode=kafka
pk_fields=__connect_topic,__connect_partition,__connect_offset
fields.whitelist=ident,auth,response,request,clientip
auto.create=true
auto.evolve=true
confluent-distributed.properties
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
quicstart-elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
#topics=test-elasticsearch-sink
topics=processed
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
schema.ignore=true
schemas.enable=false
confluent-schema-registry 服务是 运行。
我在 curl http://localhost:8083/connectors/jdbc-sink/status 后收到以下错误 | jq
{
"name": "jdbc-sink",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.50.37:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "192.168.50.37:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
"
}
],
"type": "sink"
}
这看起来像我的 "processed" 主题中的一条消息(主题中的消息是 one-liner,这是刚刚格式化的):
{
"ROWTIME": 1587134287569,
"ROWKEY": "null",
"bytes": "4050",
"input": {
"type": "log"
},
"clientip": "156.226.170.95",
"@timestamp": "2020-04-17T14:38:06.346Z",
"timestamp": "17/Apr/2020:14:37:57 +0000",
"@version": "1",
"request": "/lists",
"ident": "536e605f097a92cb07c2a0a81f809f481c5af00d13305f0094057907792ce65e2a62b8ab8ba036f95a840504c3e2f05a",
"httpversion": "1.1",
"auth": "33a7f4a829adfaa60085eca1b84e0ae8f0aa2835d206ac765c22ad440e50d7ae462adafb13306aecfdcd6bd80b52b03f",
"agent": {
"ephemeral_id": "053b9c29-9038-4a89-a2b3-a5d8362460fe",
"version": "7.6.2",
"id": "50e21169-5aa0-496f-b792-3936e9c8de04",
"hostname": "HOSTNAME_OF_MY_AWS_INSTANCE",
"type": "filebeat"
},
"log": {
"offset": 707943,
"file": {
"path": "/home/ec2-user/log/apache.log"
}
},
"host": {
"name": "HOSTNAME_OF_MY_AWS_INSTANCE"
},
"verb": "DELETE",
"ecs": {
"version": "1.4.0"
},
"response": "503"
}
如果您需要更多信息,请告诉我。
你的错误在这里:
DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
由于这是 JDBC 接收器,您 必须 为您的数据提供架构。如果你有选择,我建议你使用 Avro。如果没有,您 必须 按照 Kafka Connect 的要求构建您的 JSON 数据。