反序列化 Avro 消息
Deserializing Avro message
我从 here 部署了 Kafka。我还像这样添加到 docker-compose.yml
Postgres 容器中:
postgres:
image: postgres
hostname: kafka-postgres
container_name: kafka-postgres
depends_on:
- ksql-server
- broker
- schema-registry
- connect
ports:
- 5432:5432
创建了一个主题浏览量。
此外,我使用设置创建了 DatagenConnector,运行 它。
{
"name": "datagen-pageviews",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "pageviews",
"max.interval": "100",
"iterations": "999999999",
"quickstart": "pageviews"
}
据我所知,连接器为主题定义了一个架构:
{
"type": "record",
"name": "pageviews",
"namespace": "ksql",
"fields": [
{
"name": "viewtime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "pageid",
"type": "string"
}
],
"connect.name": "ksql.pageviews"
}
我的下一步是创建 JdbcSinkConnector,它将运行将数据从 Kafka 主题转移到 Postgres table。那奏效了。连接器的设置:
{
"name": "from-kafka-to-pg",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": [
"pageviews"
],
"connection.url": "jdbc:postgresql://kafka-postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "********",
"auto.create": "true",
"auto.evolve": "true"
}
然后我尝试自己给那个话题发消息。但因错误而失败:
[2020-02-01 21:16:11,750] ERROR Error encountered in task to-pg-0.
Executing stage 'VALUE_CONVERTER' with class
'io.confluent.connect.avro.AvroConverter', where consumed record is
{topic='pageviews', partition=0, offset=23834,
timestamp=1580591160374, timestampType=CreateTime}.
(org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Failed to deserialize
data for topic pageviews to Avro: at
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:487)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id -1 Caused by:
org.apache.kafka.common.errors.SerializationException: Unknown magic
byte!
所以发送方法很重要。我就是这样做的(Python,confluent-kafka-python):
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.poll(0)
producer.produce(topic, json.dumps({
'viewtime': 123,
'userid': 'user_1',
'pageid': 'page_1'
}).encode('utf8'), on_delivery=kafka_delivery_report)
producer.flush()
也许我应该提供一个带有消息的模式 (AvroProducer)?
您的问题出现是因为您尝试使用 Avro 转换器 从 非 Avro 的主题读取数据。
有两种可能的解决方案:
1.将 Kafka Connect 的接收器连接器切换为使用正确的转换器
例如,如果您正在使用来自 Kafka 主题的 JSON 数据到 Kafka Connect 接收器:
...
value.converter=org.apache.kafka.connect.json.JsonConverter.
value.converter.schemas.enable=true/false
...
value.converter.schemas.enable
取决于消息是否包含架构..
2。将上行格式切换为 Avro
要让 DatagenConnector 向 Kafka 生成消息值格式为 Avro
的消息,请设置 value.converter
和 value.converter.schema.registry.url
参数:
...
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
...
有关详细信息,请参阅 kafka-connect-datagen docs。
关于 Kafka 连接转换器和序列化的 article 很棒。
主题需要 Avro 类型的消息。
AvroProducer
from confluent-kafka-python
成功了:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"namespace": "ksql",
"name": "value",
"type": "record",
"fields" : [
{
"name" : "viewtime",
"type" : "long"
},
{
"name" : "userid",
"type" : "string"
},
{
"name" : "pageid",
"type" : "string"
}
]
}
"""
key_schema_str = """
{
"namespace": "ksql",
"name": "key",
"type": "record",
"fields" : [
{
"name" : "pageid",
"type" : "string"
}
]
}
"""
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"name": "Value"}
key = {"name": "Key"}
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
avroProducer = AvroProducer({
'bootstrap.servers': 'mybroker,mybroker2',
'on_delivery': delivery_report,
'schema.registry.url': 'http://schema_registry_host:port'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)
avroProducer.flush()
我从 here 部署了 Kafka。我还像这样添加到 docker-compose.yml
Postgres 容器中:
postgres:
image: postgres
hostname: kafka-postgres
container_name: kafka-postgres
depends_on:
- ksql-server
- broker
- schema-registry
- connect
ports:
- 5432:5432
创建了一个主题浏览量。
此外,我使用设置创建了 DatagenConnector,运行 它。
{
"name": "datagen-pageviews",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "pageviews",
"max.interval": "100",
"iterations": "999999999",
"quickstart": "pageviews"
}
据我所知,连接器为主题定义了一个架构:
{
"type": "record",
"name": "pageviews",
"namespace": "ksql",
"fields": [
{
"name": "viewtime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "pageid",
"type": "string"
}
],
"connect.name": "ksql.pageviews"
}
我的下一步是创建 JdbcSinkConnector,它将运行将数据从 Kafka 主题转移到 Postgres table。那奏效了。连接器的设置:
{
"name": "from-kafka-to-pg",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": [
"pageviews"
],
"connection.url": "jdbc:postgresql://kafka-postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "********",
"auto.create": "true",
"auto.evolve": "true"
}
然后我尝试自己给那个话题发消息。但因错误而失败:
[2020-02-01 21:16:11,750] ERROR Error encountered in task to-pg-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='pageviews', partition=0, offset=23834, timestamp=1580591160374, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter) org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pageviews to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:487) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
所以发送方法很重要。我就是这样做的(Python,confluent-kafka-python):
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.poll(0)
producer.produce(topic, json.dumps({
'viewtime': 123,
'userid': 'user_1',
'pageid': 'page_1'
}).encode('utf8'), on_delivery=kafka_delivery_report)
producer.flush()
也许我应该提供一个带有消息的模式 (AvroProducer)?
您的问题出现是因为您尝试使用 Avro 转换器 从 非 Avro 的主题读取数据。
有两种可能的解决方案:
1.将 Kafka Connect 的接收器连接器切换为使用正确的转换器
例如,如果您正在使用来自 Kafka 主题的 JSON 数据到 Kafka Connect 接收器:
...
value.converter=org.apache.kafka.connect.json.JsonConverter.
value.converter.schemas.enable=true/false
...
value.converter.schemas.enable
取决于消息是否包含架构..
2。将上行格式切换为 Avro
要让 DatagenConnector 向 Kafka 生成消息值格式为 Avro
的消息,请设置 value.converter
和 value.converter.schema.registry.url
参数:
...
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
...
有关详细信息,请参阅 kafka-connect-datagen docs。
关于 Kafka 连接转换器和序列化的 article 很棒。
主题需要 Avro 类型的消息。
AvroProducer
from confluent-kafka-python
成功了:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"namespace": "ksql",
"name": "value",
"type": "record",
"fields" : [
{
"name" : "viewtime",
"type" : "long"
},
{
"name" : "userid",
"type" : "string"
},
{
"name" : "pageid",
"type" : "string"
}
]
}
"""
key_schema_str = """
{
"namespace": "ksql",
"name": "key",
"type": "record",
"fields" : [
{
"name" : "pageid",
"type" : "string"
}
]
}
"""
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"name": "Value"}
key = {"name": "Key"}
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
avroProducer = AvroProducer({
'bootstrap.servers': 'mybroker,mybroker2',
'on_delivery': delivery_report,
'schema.registry.url': 'http://schema_registry_host:port'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)
avroProducer.flush()