使用kafka为clickhouse生产数据
Using kafka to produce data for clickhouse
我想为 clickhouse 使用 kafka 集成。我尝试使用像here这样的官方教程!所有 table 已创建。我 运行 卡夫卡服务器。接下来 运行 kafka 生产者并在命令提示符中写入 json 对象,如数据库中的行。像这样:
{"timestamp":1554138000,"level":"first","message":"abc"}
我检查了 kafka consumer.It 收到的对象。但是当我在我的 clickhouse 数据库中检查 tables 时,有空行。知道我做错了什么吗?
更新
要忽略格式错误的消息,请将 kafka_skip_broken_messages-参数传递给 table 定义。
看起来像 well-known issue 出现在最新版本的 CH 之一中,尝试在引擎配置中添加额外参数 kafka_row_delimiter:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
)
ENGINE = Kafka SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n'
kafka_skip_broken_messages = 1;
非常抱歉。这是我的失败。在开始clickhouse和kafka之前。我测试了通过 kafka 将简单消息发送到主题中。 clickhouse 尝试解析它。我刚刚创建新主题,现在一切正常。谢谢!
我想为 clickhouse 使用 kafka 集成。我尝试使用像here这样的官方教程!所有 table 已创建。我 运行 卡夫卡服务器。接下来 运行 kafka 生产者并在命令提示符中写入 json 对象,如数据库中的行。像这样:
{"timestamp":1554138000,"level":"first","message":"abc"}
我检查了 kafka consumer.It 收到的对象。但是当我在我的 clickhouse 数据库中检查 tables 时,有空行。知道我做错了什么吗?
更新
要忽略格式错误的消息,请将 kafka_skip_broken_messages-参数传递给 table 定义。
看起来像 well-known issue 出现在最新版本的 CH 之一中,尝试在引擎配置中添加额外参数 kafka_row_delimiter:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
)
ENGINE = Kafka SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n'
kafka_skip_broken_messages = 1;
非常抱歉。这是我的失败。在开始clickhouse和kafka之前。我测试了通过 kafka 将简单消息发送到主题中。 clickhouse 尝试解析它。我刚刚创建新主题,现在一切正常。谢谢!