kafka JDBC MySQL 源连接器是否需要在本地主机上有 MySQL 服务器?
Does the kafka JDBC MySQL source connector need to have MySQL Server on localhost?
我是 Kafka 的新手,我正在尝试建立一个简单的 kafka 连接系统和 运行 一个 MySQL 源连接器和一个 Elasticsearch + Elastic 搜索接收器连接器;用于基本数据流目的。
我正在按照以下步骤操作
https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/
及其第 2 部分
(我已经通过在源端有一个简单的生产者来验证 ES 的工作。)
除 MySQL 源连接器外,一切都已配置并按预期工作。
我尝试这一切的虚拟机 没有安装 MySQL 服务器 。本教程的 DBMS 部分我正在使用客户端 create/alter 并摆弄表格。
因此在源属性中,我尝试了:
"connection.url": "jdbc:mysql://IPaddressofDB:3306/DBname?user=uname&password=pwd"
"table.whitelist": "tablename"
为了启动连接器,我只是做了一个 ./confluent load connector-name
一旦我加载源连接器并检查其状态,它就会给出一个错误
"org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t ...
Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t
这是否正确?我是不是完全漏掉了什么?
如何为我正在尝试的情况指定 connection.url:您尝试连接到不同数据库服务器的位置?几乎所有 examples/git 问题等似乎都只指定本地主机。
我不确定 admin_portal
的来源,我根本没有在任何地方指定
****针对@robin-moffat 的建议进行了编辑(似乎给出了与之前相同的错误)
sourceconfig.json:
{
"name": "jdbc_source_mysql_new",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://ipaddress:3306/dbname?user=uname&password=pwd",
"table.whitelist": "dbname.tablename",
"topic.prefix": "mysql-new-",
"mode":"incrementing",
"incrementing.column.name": "colname"
}
}
加载连接器:
>curl -X POST -H "Content-Type: application/json" --data @sourceconfig.json http://localhost:8083/connectors
检查连接器的状态:
>curl -X GET localhost:8083/connectors/jdbc_source_mysql_new/tasks/0/status
{"state":"FAILED","
"trace":
"org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:400)\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:156)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\t
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\t
at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\t
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\t
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t
at java.lang.Thread.run(Thread.java:748)\n
Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)\n\t
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)\n\t
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)\n\t
at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1218)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData.forEach(DatabaseMetaData.java:2950)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData.forEach(DatabaseMetaData.java:2938)\n\t
at com.mysql.cj.jdbc.IterateBlock.doForAll(IterateBlock.java:56)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData.getPrimaryKeys(DatabaseMetaData.java:2991)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.primaryKeyColumns(GenericDatabaseDialect.java:696)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:533)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:513)\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:369)\n\t... 9 more\n",}
Does the kafka JDBC MySQL source connector need to have MySQL Server on localhost?
没有。它使用 JDBC 可以连接到远程实例上的服务器。
- Is this even correct? Am I missing something completely?
根据您的描述,您是对的:)
- How to specify connection.url for cases like I am trying: where you are trying to connect to different DB servers? Almost all the examples/git issues etc seem to only specify localhost.
可以看到an example here
您需要正确配置 JDBC URL,can be found here for MySQL.
的语法
- I'm not sure where admin_portal came from, I have not specified that anywhere at all
这取决于您连接到数据库的用户的权限。您需要确保它可以访问要从中读取数据的 table。您还可以限定您的 table 名称,例如
"table.whitelist": "schema.tablename"
在我将我的 SQL 连接器版本从 8.x 降级到 5.1.47 并将其放在正确的 $CLASSPATH
中后它起作用了
mysql-connector-java-5.1.47.jar
我是 Kafka 的新手,我正在尝试建立一个简单的 kafka 连接系统和 运行 一个 MySQL 源连接器和一个 Elasticsearch + Elastic 搜索接收器连接器;用于基本数据流目的。
我正在按照以下步骤操作 https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/ 及其第 2 部分 (我已经通过在源端有一个简单的生产者来验证 ES 的工作。)
除 MySQL 源连接器外,一切都已配置并按预期工作。 我尝试这一切的虚拟机 没有安装 MySQL 服务器 。本教程的 DBMS 部分我正在使用客户端 create/alter 并摆弄表格。 因此在源属性中,我尝试了:
"connection.url": "jdbc:mysql://IPaddressofDB:3306/DBname?user=uname&password=pwd"
"table.whitelist": "tablename"
为了启动连接器,我只是做了一个 ./confluent load connector-name
一旦我加载源连接器并检查其状态,它就会给出一个错误
"org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t ...
Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t
这是否正确?我是不是完全漏掉了什么?
如何为我正在尝试的情况指定 connection.url:您尝试连接到不同数据库服务器的位置?几乎所有 examples/git 问题等似乎都只指定本地主机。
我不确定
admin_portal
的来源,我根本没有在任何地方指定
****针对@robin-moffat 的建议进行了编辑(似乎给出了与之前相同的错误)
sourceconfig.json:
{
"name": "jdbc_source_mysql_new",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://ipaddress:3306/dbname?user=uname&password=pwd",
"table.whitelist": "dbname.tablename",
"topic.prefix": "mysql-new-",
"mode":"incrementing",
"incrementing.column.name": "colname"
}
}
加载连接器:
>curl -X POST -H "Content-Type: application/json" --data @sourceconfig.json http://localhost:8083/connectors
检查连接器的状态:
>curl -X GET localhost:8083/connectors/jdbc_source_mysql_new/tasks/0/status
{"state":"FAILED","
"trace":
"org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:400)\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:156)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\t
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\t
at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\t
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\t
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t
at java.lang.Thread.run(Thread.java:748)\n
Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)\n\t
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)\n\t
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)\n\t
at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1218)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData.forEach(DatabaseMetaData.java:2950)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData.forEach(DatabaseMetaData.java:2938)\n\t
at com.mysql.cj.jdbc.IterateBlock.doForAll(IterateBlock.java:56)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData.getPrimaryKeys(DatabaseMetaData.java:2991)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.primaryKeyColumns(GenericDatabaseDialect.java:696)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:533)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:513)\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:369)\n\t... 9 more\n",}
Does the kafka JDBC MySQL source connector need to have MySQL Server on localhost?
没有。它使用 JDBC 可以连接到远程实例上的服务器。
- Is this even correct? Am I missing something completely?
根据您的描述,您是对的:)
- How to specify connection.url for cases like I am trying: where you are trying to connect to different DB servers? Almost all the examples/git issues etc seem to only specify localhost.
可以看到an example here
您需要正确配置 JDBC URL,can be found here for MySQL.
的语法
- I'm not sure where admin_portal came from, I have not specified that anywhere at all
这取决于您连接到数据库的用户的权限。您需要确保它可以访问要从中读取数据的 table。您还可以限定您的 table 名称,例如
"table.whitelist": "schema.tablename"
在我将我的 SQL 连接器版本从 8.x 降级到 5.1.47 并将其放在正确的 $CLASSPATH
中后它起作用了mysql-connector-java-5.1.47.jar