如何使用 confluent 从 kafka 主题到 elasticsearch?
How get the stream from kafka topic to elasticsearch with confluent?
我从机器读取数据并将其作为 JSON 流式传输到 kafka 主题。我想阅读此主题并将流数据存储到 elasticsearch 中。
我的步骤:
1. 创建 KSQL Streams 以从 JSON 转换为 AVRO
json 流:
CREATE STREAM source_json_pressure
(
timestamp BIGINT,
opcuaObject VARCHAR,
value DOUBLE
)
WITH (KAFKA_TOPIC='7d12h100mbpressure',
VALUE_FORMAT='JSON');
avro 流:
CREATE STREAM target_avro_pressure
WITH (
KAFKA_TOPIC='7d12h100mbpressure_avro',
VALUE_FORMAT='AVRO'
) AS
SELECT * FROM source_json_pressure;
在此之后我得到了这个 avro 流:
ksql> print "7d12h100mbpressure_avro";
Format:AVRO
23.04.19 19:29:58 MESZ, jK?C, {"TIMESTAMP": 1556040449728, "OPCUAOBJECT": "DatLuDrUeb.EinDru", "VALUE": 7.42}
我的elasticsearch.properties:
15 name=elasticsearch-sink
16 connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
17 tasks.max=1
18 topics=7d12h100mbpressure_avro
19 key.ignore=true
20 connection.url=http://localhost:9200
21 type.name=kafka-connect
在此之后我期待 ES 中的流,但我得到的是没有流数据的索引。
我哪里弄错了?
来自合流日志连接的错误:
[2019-04-24 11:01:29,316] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Setting newly assigned partitions: 7d12h100mbpressure_avro-3, 7d12h100mbpressure_avro-2, 7d12h100mbpressure_avro-1, 7d12h100mbpressure_avro-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2019-04-24 11:01:29,327] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-3 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-04-24 11:01:29,327] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-2 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-04-24 11:01:29,327] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-1 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-04-24 11:01:29,328] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-04-24 11:01:29,667] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic 7d12h100mbpressure_avro to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord[=14=](WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 92747
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord[=14=](WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-04-24 11:01:29,668] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
我的 connect-avro-distributed.properties:
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
您从 Elasticsink 设置了 key.ignore=true
,但是,这不会阻止 Connect 尝试反序列化记录。
当您只执行 confluent start
时,它将始终对键和值转换器使用 AvroConverter
。
值得一提的是,我认为,KSQL 中的VALUE_FORMAT='AVRO'
仅使value 成为Avro,而不是密钥。
其中一个原因可能解释了为什么您会看到
- 未找到主题
- 未找到架构
- 检索 id 的 Avro 架构时出错
要解决此问题,在您的 elasticsearch.properties
中,您可以将 key.converter
覆盖为其他内容,例如 org.apache.kafka.connect.storage.StringConverter
此外,我建议使用 kafka-avro-console-consumer
并包括 --property print.key=true
选项,而不是使用 Connect+KSQL 进行调试,以查看您是否会遇到类似的错误。
我从机器读取数据并将其作为 JSON 流式传输到 kafka 主题。我想阅读此主题并将流数据存储到 elasticsearch 中。
我的步骤: 1. 创建 KSQL Streams 以从 JSON 转换为 AVRO
json 流:
CREATE STREAM source_json_pressure
(
timestamp BIGINT,
opcuaObject VARCHAR,
value DOUBLE
)
WITH (KAFKA_TOPIC='7d12h100mbpressure',
VALUE_FORMAT='JSON');
avro 流:
CREATE STREAM target_avro_pressure
WITH (
KAFKA_TOPIC='7d12h100mbpressure_avro',
VALUE_FORMAT='AVRO'
) AS
SELECT * FROM source_json_pressure;
在此之后我得到了这个 avro 流:
ksql> print "7d12h100mbpressure_avro";
Format:AVRO
23.04.19 19:29:58 MESZ, jK?C, {"TIMESTAMP": 1556040449728, "OPCUAOBJECT": "DatLuDrUeb.EinDru", "VALUE": 7.42}
我的elasticsearch.properties:
15 name=elasticsearch-sink
16 connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
17 tasks.max=1
18 topics=7d12h100mbpressure_avro
19 key.ignore=true
20 connection.url=http://localhost:9200
21 type.name=kafka-connect
在此之后我期待 ES 中的流,但我得到的是没有流数据的索引。
我哪里弄错了?
来自合流日志连接的错误:
[2019-04-24 11:01:29,316] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Setting newly assigned partitions: 7d12h100mbpressure_avro-3, 7d12h100mbpressure_avro-2, 7d12h100mbpressure_avro-1, 7d12h100mbpressure_avro-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2019-04-24 11:01:29,327] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-3 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-04-24 11:01:29,327] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-2 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-04-24 11:01:29,327] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-1 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-04-24 11:01:29,328] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-04-24 11:01:29,667] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic 7d12h100mbpressure_avro to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord[=14=](WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 92747
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord[=14=](WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-04-24 11:01:29,668] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
我的 connect-avro-distributed.properties:
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
您从 Elasticsink 设置了 key.ignore=true
,但是,这不会阻止 Connect 尝试反序列化记录。
当您只执行 confluent start
时,它将始终对键和值转换器使用 AvroConverter
。
值得一提的是,我认为,KSQL 中的VALUE_FORMAT='AVRO'
仅使value 成为Avro,而不是密钥。
其中一个原因可能解释了为什么您会看到
- 未找到主题
- 未找到架构
- 检索 id 的 Avro 架构时出错
要解决此问题,在您的 elasticsearch.properties
中,您可以将 key.converter
覆盖为其他内容,例如 org.apache.kafka.connect.storage.StringConverter
此外,我建议使用 kafka-avro-console-consumer
并包括 --property print.key=true
选项,而不是使用 Connect+KSQL 进行调试,以查看您是否会遇到类似的错误。