什么是数据类型 LOB 的列不能是唯一的或主键错误?
what is column of datatype LOB cannot be unique or a primary key error?
我的合流水槽属性:
name=sink-oracle
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
# The topics to consume from - required for sink connectors like this one
topics=ersin_test
# Configuration specific to the JDBC sink connector.
# We want to connect to a SQLite database stored in the file test.db and auto-create tables.
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
connection.url=jdbc:oracle:thin:@10.0.0.0:123/abc
connection.user=ersin
connection.password=ersin!
table.name.format=ERSIN_TEST
auto.create=true
delete.enabled=true
pk.mode=record_key
pk.fields=ID
insert.mode=upsert
错误
INFO Checking Oracle dialect for existence of table "ERSIN_TEST" (io.confluent.connect.jdbc.dia
lect.OracleDatabaseDialect:536)
[2020-04-15 00:31:44,982] INFO Using Oracle dialect table "ERSIN_TEST" absent (io.confluent.connect.jdbc.dialect.OracleDa
tabaseDialect:544)
[2020-04-15 00:31:44,982] INFO Creating table with sql: CREATE TABLE "ERSIN_TEST" (
"ID" CLOB NOT NULL,
"PRODUCT" CLOB NULL,
"QUANTITY" NUMBER(10,0) NULL,
"PRICE" NUMBER(10,0) NULL,
PRIMARY KEY("ID")) (io.confluent.connect.jdbc.sink.DbStructure:92)
[2020-04-15 00:31:44,995] WARN Create failed, will attempt amend if table already exists (io.confluent.connect.jdbc.sink.
DbStructure:63)
java.sql.SQLException: ORA-02329: column of datatype LOB cannot be unique or a primary key
json数据
{
"schema": {
"type": "struct",
"fields": [
{
"field": 'ID',
"type": "int32",
"optional": False
},
{
"field": 'PRODUCT',
"type": "string",
"optional": True
},
{
"field": 'QUANTITY',
"type": "int32",
"optional": True
},
{
"field": 'PRICE',
"type": "int32",
"optional": True
}
],
"optional": True,
"name": "myrecord"
},
"payload": {
"ID": 1071,
"PRODUCT": 'ersin',
"QUANTITIY": 1071,
"PRICE": 1453
}
python代码:
producer.send(topic, key=b'1071'
, value=json.dumps(v, default=json_util.default).encode('utf-8'))
我该如何解决这个问题?
提前致谢
连接器试图创建一个名为 ERSIN_TEST
的 table
[2020-04-15 00:31:44,982] INFO Creating table with sql: CREATE TABLE "ERSIN_TEST" (
"ID" CLOB NOT NULL,
"PRODUCT" CLOB NULL,
"QUANTITY" NUMBER(10,0) NULL,
"PRICE" NUMBER(10,0) NULL,
PRIMARY KEY("ID"))
使用 ID
字段作为主键。
If auto.create
is enabled, the connector can CREATE the destination
table if it is found to be missing. The creation takes place online
with records being consumed from the topic, since the connector uses
the record schema as a basis for the table definition. Primary keys are specified based on the key configuration settings.
对于 Oracle,默认情况下 JDBC sink connector 会将 VARCHAR
映射到 NCLOB
。
您需要更改 ID
的类型,因为类型 CLOB
的字段不能设置为主键。为此,首先您需要禁止 Kafka Connect 自动为您创建 table:
auto.create=false
现在转到您的 Oracle 数据库并手动创建 table,但这次不使用 CLOB
,而是使用 NUMBER
:
CREATE TABLE ERSIN_TEST (
"ID" NUMBER(10) NOT NULL,
"PRODUCT" CLOB NULL,
"QUANTITY" NUMBER(10,0) NULL,
"PRICE" NUMBER(10,0) NULL,
PRIMARY KEY("ID")
)
最后重新 运行 您的连接器。
附带说明一下,不要在 JSON 中使用单引号。
我的合流水槽属性:
name=sink-oracle
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
# The topics to consume from - required for sink connectors like this one
topics=ersin_test
# Configuration specific to the JDBC sink connector.
# We want to connect to a SQLite database stored in the file test.db and auto-create tables.
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
connection.url=jdbc:oracle:thin:@10.0.0.0:123/abc
connection.user=ersin
connection.password=ersin!
table.name.format=ERSIN_TEST
auto.create=true
delete.enabled=true
pk.mode=record_key
pk.fields=ID
insert.mode=upsert
错误
INFO Checking Oracle dialect for existence of table "ERSIN_TEST" (io.confluent.connect.jdbc.dia
lect.OracleDatabaseDialect:536)
[2020-04-15 00:31:44,982] INFO Using Oracle dialect table "ERSIN_TEST" absent (io.confluent.connect.jdbc.dialect.OracleDa
tabaseDialect:544)
[2020-04-15 00:31:44,982] INFO Creating table with sql: CREATE TABLE "ERSIN_TEST" (
"ID" CLOB NOT NULL,
"PRODUCT" CLOB NULL,
"QUANTITY" NUMBER(10,0) NULL,
"PRICE" NUMBER(10,0) NULL,
PRIMARY KEY("ID")) (io.confluent.connect.jdbc.sink.DbStructure:92)
[2020-04-15 00:31:44,995] WARN Create failed, will attempt amend if table already exists (io.confluent.connect.jdbc.sink.
DbStructure:63)
java.sql.SQLException: ORA-02329: column of datatype LOB cannot be unique or a primary key
json数据
{
"schema": {
"type": "struct",
"fields": [
{
"field": 'ID',
"type": "int32",
"optional": False
},
{
"field": 'PRODUCT',
"type": "string",
"optional": True
},
{
"field": 'QUANTITY',
"type": "int32",
"optional": True
},
{
"field": 'PRICE',
"type": "int32",
"optional": True
}
],
"optional": True,
"name": "myrecord"
},
"payload": {
"ID": 1071,
"PRODUCT": 'ersin',
"QUANTITIY": 1071,
"PRICE": 1453
}
python代码:
producer.send(topic, key=b'1071'
, value=json.dumps(v, default=json_util.default).encode('utf-8'))
我该如何解决这个问题?
提前致谢
连接器试图创建一个名为 ERSIN_TEST
[2020-04-15 00:31:44,982] INFO Creating table with sql: CREATE TABLE "ERSIN_TEST" (
"ID" CLOB NOT NULL,
"PRODUCT" CLOB NULL,
"QUANTITY" NUMBER(10,0) NULL,
"PRICE" NUMBER(10,0) NULL,
PRIMARY KEY("ID"))
使用 ID
字段作为主键。
If
auto.create
is enabled, the connector can CREATE the destination table if it is found to be missing. The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition. Primary keys are specified based on the key configuration settings.
对于 Oracle,默认情况下 JDBC sink connector 会将 VARCHAR
映射到 NCLOB
。
您需要更改 ID
的类型,因为类型 CLOB
的字段不能设置为主键。为此,首先您需要禁止 Kafka Connect 自动为您创建 table:
auto.create=false
现在转到您的 Oracle 数据库并手动创建 table,但这次不使用 CLOB
,而是使用 NUMBER
:
CREATE TABLE ERSIN_TEST (
"ID" NUMBER(10) NOT NULL,
"PRODUCT" CLOB NULL,
"QUANTITY" NUMBER(10,0) NULL,
"PRICE" NUMBER(10,0) NULL,
PRIMARY KEY("ID")
)
最后重新 运行 您的连接器。
附带说明一下,不要在 JSON 中使用单引号。