如何从 flink 作业中查找数据库数据
How to lookup db data from a flink job
我是 Apache Flink 的新手。
我正在构建一个flink算子。需要从关系存储中获取数据以处理流数据。这是一个小的快速查找。我使用 spring-jdbc 客户端进行查找。
public class FilterCriteriaEvaluator extends KeyedProcessFunction<Long, DeviceAttrUpdate, FilterCriteriaEval> {
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
public FilterCriteriaEvaluator(DataSource dataSource) {
namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
...
然而,在集群上执行此作业时出现以下错误
Caused by: java.io.NotSerializableException: org.springframework.jdbc.core.JdbcTemplate
spring 数据库客户端不可序列化。然后我考虑直接使用 java.sql.DataSource
。但这也不是可序列化的。
将数据库客户端标记为瞬态没有帮助,因为它不会序列化为运算符对象的一部分,我在集群上执行作业时得到 NPE
。
我在这里错过了什么?我如何从 flink 作业中进行数据库查找?
作为解决方法,您可以将 JdbcTemplate
标记为瞬态并使其可延迟初始化 - 类似于
private transient JdbcTemplate instance = null;
// Not thread-safe
public JdbcTemplate getInstance() {
if(instance == null){
// init
}
return instance;
}
并通过 getInstance()
方法访问它。
但是,通过这种方式,每个任务槽都有一个单独的实例。
为了每个任务管理器只有一个实例,您可以将其设为静态变量。但是,这样您将需要关心线程安全并制作线程安全的初始化程序。
或者,您应该再次标记变量 transient
,但不是制作 getInstance
字段,而是从 RichFunction
扩展(或您拥有的任何操作)并在 open(Configuration parameters)
方法,在初始化时被调用,适用于一次性设置工作。
Flink 中内置的使您正在做的事情变得简单的机制是 lookup join 使用 Flink SQL。
看起来像这样:
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
'table-name' = 'customers'
);
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
有关 JDBC SQL 连接器的更多信息,请参阅 the docs。
我是 Apache Flink 的新手。
我正在构建一个flink算子。需要从关系存储中获取数据以处理流数据。这是一个小的快速查找。我使用 spring-jdbc 客户端进行查找。
public class FilterCriteriaEvaluator extends KeyedProcessFunction<Long, DeviceAttrUpdate, FilterCriteriaEval> {
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
public FilterCriteriaEvaluator(DataSource dataSource) {
namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
...
然而,在集群上执行此作业时出现以下错误
Caused by: java.io.NotSerializableException: org.springframework.jdbc.core.JdbcTemplate
spring 数据库客户端不可序列化。然后我考虑直接使用 java.sql.DataSource
。但这也不是可序列化的。
将数据库客户端标记为瞬态没有帮助,因为它不会序列化为运算符对象的一部分,我在集群上执行作业时得到 NPE
。
我在这里错过了什么?我如何从 flink 作业中进行数据库查找?
作为解决方法,您可以将 JdbcTemplate
标记为瞬态并使其可延迟初始化 - 类似于
private transient JdbcTemplate instance = null;
// Not thread-safe
public JdbcTemplate getInstance() {
if(instance == null){
// init
}
return instance;
}
并通过 getInstance()
方法访问它。
但是,通过这种方式,每个任务槽都有一个单独的实例。
为了每个任务管理器只有一个实例,您可以将其设为静态变量。但是,这样您将需要关心线程安全并制作线程安全的初始化程序。
或者,您应该再次标记变量 transient
,但不是制作 getInstance
字段,而是从 RichFunction
扩展(或您拥有的任何操作)并在 open(Configuration parameters)
方法,在初始化时被调用,适用于一次性设置工作。
Flink 中内置的使您正在做的事情变得简单的机制是 lookup join 使用 Flink SQL。
看起来像这样:
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
'table-name' = 'customers'
);
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
有关 JDBC SQL 连接器的更多信息,请参阅 the docs。