SF KAFKA 连接器详细信息:Table 没有兼容的架构 - snowflake kafka 连接器
SF KAFKA CONNECTOR Detail: Table doesn't have a compatible schema - snowflake kafka connector
我已经设置了 snowflake - kafka 连接器。我在 snowflake 中设置了一个示例 table (kafka_connector_test),其中有 2 个字段都是 VARCHAR 类型。
字段是 CUSTOMER_ID 和 PURCHASE_ID.
这是我为连接器创建的配置
curl -X POST \
-H "Content-Type: application/json" \
--data '{
"name":"kafka_connector_test",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"2",
"topics":"kafka-connector-test",
"snowflake.topic2table.map": "kafka-connector-test:kafka_connector_test",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"XXXXXXXX.snowflakecomputing.com:443",
"snowflake.user.name":"XXXXXXXX",
"snowflake.private.key":"XXXXXXXX",
"snowflake.database.name":"XXXXXXXX",
"snowflake.schema.name":"XXXXXXXX",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"}}'\
我将数据发送到我在连接器配置中配置的主题。
{"CUSTOMER_ID" : "test_id", "PURCHASE_ID" : "purchase_id_test"}
然后,当我检查 kafka-connect 服务器时,出现以下错误:
[SF KAFKA CONNECTOR] Detail: Table doesn't have a compatible schema
我是否需要在 kafka connect 或 snowflake 中设置一些东西来说明 json 的哪些部分进入 table 的哪些列?不确定如何指定它如何解析 json.
我也设置了一个不同的主题,但没有在 snowlake 中创建 table。因为我能够填充这个 table 但连接器使 table 具有 2 列 RECORD_METADATA 和 RECORD_CONTENT。但是我不想写一个计划的工作来解析这个我想直接插入到一个可查询的 table.
Snowflake Kafka 连接器按设计将数据写入 json。默认列 RECORD_METADATA
和 RECORD_CONTENT
是变体。如果你想查询它们,你可以在 table 之上创建一个视图来实现你的目标,你不需要一个预定的工作
因此,连接器创建的 table 类似于
RECORD_METADATA, RECORD_CONTENT
{metadata fields in json}, {"CUSTOMER_ID" : "test_id", "PURCHASE_ID" : "purchase_id_test"}
您可以创建一个视图来显示您的数据
create view v1 as
select RECORD_CONTENT:CUSTOMER_ID::text CUSTOMER_ID,
RECORD_CONTENT:PURCHASE_ID::text PURCHASE_ID
您的查询将是
select CUSTOMER_ID , PURCHASE_ID from v1
PS。如果你想创建你自己的 tables 你需要使用 variant
作为你的数据类型而不是 varchar
,目前似乎也不支持它
我已经设置了 snowflake - kafka 连接器。我在 snowflake 中设置了一个示例 table (kafka_connector_test),其中有 2 个字段都是 VARCHAR 类型。 字段是 CUSTOMER_ID 和 PURCHASE_ID.
这是我为连接器创建的配置
curl -X POST \
-H "Content-Type: application/json" \
--data '{
"name":"kafka_connector_test",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"2",
"topics":"kafka-connector-test",
"snowflake.topic2table.map": "kafka-connector-test:kafka_connector_test",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"XXXXXXXX.snowflakecomputing.com:443",
"snowflake.user.name":"XXXXXXXX",
"snowflake.private.key":"XXXXXXXX",
"snowflake.database.name":"XXXXXXXX",
"snowflake.schema.name":"XXXXXXXX",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"}}'\
我将数据发送到我在连接器配置中配置的主题。
{"CUSTOMER_ID" : "test_id", "PURCHASE_ID" : "purchase_id_test"}
然后,当我检查 kafka-connect 服务器时,出现以下错误:
[SF KAFKA CONNECTOR] Detail: Table doesn't have a compatible schema
我是否需要在 kafka connect 或 snowflake 中设置一些东西来说明 json 的哪些部分进入 table 的哪些列?不确定如何指定它如何解析 json.
我也设置了一个不同的主题,但没有在 snowlake 中创建 table。因为我能够填充这个 table 但连接器使 table 具有 2 列 RECORD_METADATA 和 RECORD_CONTENT。但是我不想写一个计划的工作来解析这个我想直接插入到一个可查询的 table.
Snowflake Kafka 连接器按设计将数据写入 json。默认列 RECORD_METADATA
和 RECORD_CONTENT
是变体。如果你想查询它们,你可以在 table 之上创建一个视图来实现你的目标,你不需要一个预定的工作
因此,连接器创建的 table 类似于
RECORD_METADATA, RECORD_CONTENT
{metadata fields in json}, {"CUSTOMER_ID" : "test_id", "PURCHASE_ID" : "purchase_id_test"}
您可以创建一个视图来显示您的数据
create view v1 as
select RECORD_CONTENT:CUSTOMER_ID::text CUSTOMER_ID,
RECORD_CONTENT:PURCHASE_ID::text PURCHASE_ID
您的查询将是
select CUSTOMER_ID , PURCHASE_ID from v1
PS。如果你想创建你自己的 tables 你需要使用 variant
作为你的数据类型而不是 varchar