如何从 flink 作业中查找数据库数据

How to lookup db data from a flink job

我是 Apache Flink 的新手。

我正在构建一个flink算子。需要从关系存储中获取数据以处理流数据。这是一个小的快速查找。我使用 spring-jdbc 客户端进行查找。

public class FilterCriteriaEvaluator extends KeyedProcessFunction<Long, DeviceAttrUpdate, FilterCriteriaEval> {


private NamedParameterJdbcTemplate namedParameterJdbcTemplate;

public FilterCriteriaEvaluator(DataSource dataSource) {
    namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
...

然而,在集群上执行此作业时出现以下错误

Caused by: java.io.NotSerializableException: org.springframework.jdbc.core.JdbcTemplate

spring 数据库客户端不可序列化。然后我考虑直接使用 java.sql.DataSource 。但这也不是可序列化的。

将数据库客户端标记为瞬态没有帮助,因为它不会序列化为运算符对象的一部分,我在集群上执行作业时得到 NPE

我在这里错过了什么?我如何从 flink 作业中进行数据库查找?

作为解决方法,您可以将 JdbcTemplate 标记为瞬态并使其可延迟初始化 - 类似于


private transient JdbcTemplate instance = null;

// Not thread-safe
public JdbcTemplate getInstance() {
    if(instance == null){
        // init
    }

    return instance;
}

并通过 getInstance() 方法访问它。 但是,通过这种方式,每个任务槽都有一个单独的实例。

为了每个任务管理器只有一个实例,您可以将其设为静态变量。但是,这样您将需要关心线程安全并制作线程安全的初始化程序。

或者,您应该再次标记变量 transient,但不是制作 getInstance 字段,而是从 RichFunction 扩展(或您拥有的任何操作)并在 open(Configuration parameters) 方法,在初始化时被调用,适用于一次性设置工作。

Flink 中内置的使您正在做的事情变得简单的机制是 lookup join 使用 Flink SQL。

看起来像这样:

-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers'
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

有关 JDBC SQL 连接器的更多信息,请参阅 the docs