将消息从 Kafka 转发到 Elasticsearch 和 Postgresql

Forwarding messages from Kafka to Elasticsearch and Postgresql

我正在尝试构建一个基础架构,我需要在其中将消息从一个 kafka 主题转发到 elasticsearch 和 postgresql。我的基础架构如下图所示,它们都在同一台主机上运行。 Logstash 正在进行一些匿名化和一些变异,并将文档作为 json 发送回 kafka。然后 Kafka 应该将消息转发给 PostgreSQL 和 Elasticsearch

一切正常,接受与 postgresql 的连接,我遇到了一些问题。

我的配置文件如下所示:

sink-quickstart-sqlite.properties

name=jdbc-test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#table.name.format=${topic}
topics=processed

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable:true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable:true

connection.url=jdbc:postgresql://localhost:5432/postgres
connection.user=postgres
connection.password=luka
insert.mode=upsert

pk.mode=kafka

pk_fields=__connect_topic,__connect_partition,__connect_offset
fields.whitelist=ident,auth,response,request,clientip
auto.create=true
auto.evolve=true

confluent-distributed.properties

group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/java

quicstart-elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
#topics=test-elasticsearch-sink
topics=processed
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
schema.ignore=true
schemas.enable=false

confluent-schema-registry 服务是 运行。

我在 curl http://localhost:8083/connectors/jdbc-sink/status 后收到以下错误 | jq

{
  "name": "jdbc-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.50.37:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "192.168.50.37:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
                    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
                    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
                    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
                    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                    at java.base/java.lang.Thread.run(Thread.java:834)
                Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
                    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)
                    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:488)
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
                    ... 13 more
"
    }
  ],
  "type": "sink"
}

这看起来像我的 "processed" 主题中的一条消息(主题中的消息是 one-liner,这是刚刚格式化的):

{
    "ROWTIME": 1587134287569,
    "ROWKEY": "null",
    "bytes": "4050",
    "input": {
        "type": "log"
    },
    "clientip": "156.226.170.95",
    "@timestamp": "2020-04-17T14:38:06.346Z",
    "timestamp": "17/Apr/2020:14:37:57 +0000",
    "@version": "1",
    "request": "/lists",
    "ident": "536e605f097a92cb07c2a0a81f809f481c5af00d13305f0094057907792ce65e2a62b8ab8ba036f95a840504c3e2f05a",
    "httpversion": "1.1",
    "auth": "33a7f4a829adfaa60085eca1b84e0ae8f0aa2835d206ac765c22ad440e50d7ae462adafb13306aecfdcd6bd80b52b03f",
    "agent": {
        "ephemeral_id": "053b9c29-9038-4a89-a2b3-a5d8362460fe",
        "version": "7.6.2",
        "id": "50e21169-5aa0-496f-b792-3936e9c8de04",
        "hostname": "HOSTNAME_OF_MY_AWS_INSTANCE",
        "type": "filebeat"
    },
    "log": {
        "offset": 707943,
        "file": {
            "path": "/home/ec2-user/log/apache.log"
        }
    },
    "host": {
        "name": "HOSTNAME_OF_MY_AWS_INSTANCE"
    },
    "verb": "DELETE",
    "ecs": {
        "version": "1.4.0"
    },
    "response": "503"
}

如果您需要更多信息,请告诉我。

你的错误在这里:

DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

由于这是 JDBC 接收器,您 必须 为您的数据提供架构。如果你有选择,我建议你使用 Avro。如果没有,您 必须 按照 Kafka Connect 的要求构建您的 JSON 数据。

更多信息:https://www.youtube.com/watch?v=b-3qN_tlYR4&t=981s