无法使用 JDBC Connect 从 Kafka 流向 ClickHouse table 插入数据
Can not insert data to ClickHouse table from Kafka stream using JDBC Connect
Kafka 和 Clickhouse 在 Docker 中 运行。我正在尝试通过 JDBC 连接将一些数据从 Kafka 流插入到 ClickHouse table。从流中查询数据显示数据在 stream.Then 我创建的 Clickhouse table 中具有与 Kafka 流中相同的字段
CREATE TABLE IF NOT EXISTS default.table
(
id Int32,
ms_segment_group_id Int32,
transact_survey_tmpl_id Int32,
customer_id Int32,
customer_account_id Int32,
agent String,
comm_channel_id Int32,
comm_channel_address String,
answer_date String,
communication_type_id Int32,
object_id Int32,
create_date String,
application_id String,
external_id Int32,
transact_survey_state_id Int32,
code String
) ENGINE = MergeTree()
PARTITION BY id
ORDER BY tuple();
然后我在 ksql
中创建 JDBC CONNECT
CREATE SOURCE CONNECTOR `clickhouse-jdbc-connector` WITH (
'connector.class'='io.confluent.connect.jdbc.JdbcSinkConnector',
'topics'='STREAM_TEST',
'tasks.max'='1',
'connection.url'='jdbc:clickhouse://clickhouse:8123/default',
'table.name.format'='table'
);
ksql 的输出表明连接器已创建,一切正常。然后我尝试从 ClickHouse Table SELECT 并得到空结果:
据我了解,ClickHouse 中没有数据 table 并且 JDBC 连接无法正常工作?
执行此操作的最简单方法是将引擎从 MergeTree 更改为 Kafka。首先,需要确保 Kafka 主题正在接收消息。
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]
Kafka 和 Clickhouse 在 Docker 中 运行。我正在尝试通过 JDBC 连接将一些数据从 Kafka 流插入到 ClickHouse table。从流中查询数据显示数据在 stream.Then 我创建的 Clickhouse table 中具有与 Kafka 流中相同的字段
CREATE TABLE IF NOT EXISTS default.table
(
id Int32,
ms_segment_group_id Int32,
transact_survey_tmpl_id Int32,
customer_id Int32,
customer_account_id Int32,
agent String,
comm_channel_id Int32,
comm_channel_address String,
answer_date String,
communication_type_id Int32,
object_id Int32,
create_date String,
application_id String,
external_id Int32,
transact_survey_state_id Int32,
code String
) ENGINE = MergeTree()
PARTITION BY id
ORDER BY tuple();
然后我在 ksql
中创建 JDBC CONNECT CREATE SOURCE CONNECTOR `clickhouse-jdbc-connector` WITH (
'connector.class'='io.confluent.connect.jdbc.JdbcSinkConnector',
'topics'='STREAM_TEST',
'tasks.max'='1',
'connection.url'='jdbc:clickhouse://clickhouse:8123/default',
'table.name.format'='table'
);
ksql 的输出表明连接器已创建,一切正常。然后我尝试从 ClickHouse Table SELECT 并得到空结果:
据我了解,ClickHouse 中没有数据 table 并且 JDBC 连接无法正常工作?
执行此操作的最简单方法是将引擎从 MergeTree 更改为 Kafka。首先,需要确保 Kafka 主题正在接收消息。
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]