flink 如何与 MySQL 交互以实现与 mysql 的时间连接
how flink interacts with MySQL for the temporal join with mysql
我正在阅读
它使用 MySQL 作为临时 table 中的查找 table 连接为
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
'table-name' = 'customers'
);
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
我想知道 flink 如何与 MySQL 交互,以及 mysql 端是否存在临时连接 mysql 的性能问题 mysql。
基本问题是 flink 如何与 mysql 进行时间连接。
您将在 Table / JDBC 连接器的文档中找到一些相关详细信息:https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/table/jdbc/#features。特别参见描述查找缓存的部分,其中说
JDBC connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.
By default, lookup cache is not enabled. You can enable it by setting both lookup.cache.max-rows and lookup.cache.ttl.
The lookup cache is used to improve performance of temporal join the JDBC connector. By default, lookup cache is not enabled, so all the requests are sent to external database. When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests to external database when cache missing, and update cache with the rows returned. The oldest rows in cache will be expired when the cache hit to the max cached rows lookup.cache.max-rows or when the row exceeds the max time to live lookup.cache.ttl. The cached rows might not be the latest, users can tune lookup.cache.ttl to a smaller value to have a better fresh data, but this may increase the number of requests send to database. So this is a balance between throughput and correctness.
我正在阅读
它使用 MySQL 作为临时 table 中的查找 table 连接为
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
'table-name' = 'customers'
);
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
我想知道 flink 如何与 MySQL 交互,以及 mysql 端是否存在临时连接 mysql 的性能问题 mysql。
基本问题是 flink 如何与 mysql 进行时间连接。
您将在 Table / JDBC 连接器的文档中找到一些相关详细信息:https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/table/jdbc/#features。特别参见描述查找缓存的部分,其中说
JDBC connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.
By default, lookup cache is not enabled. You can enable it by setting both lookup.cache.max-rows and lookup.cache.ttl.
The lookup cache is used to improve performance of temporal join the JDBC connector. By default, lookup cache is not enabled, so all the requests are sent to external database. When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests to external database when cache missing, and update cache with the rows returned. The oldest rows in cache will be expired when the cache hit to the max cached rows lookup.cache.max-rows or when the row exceeds the max time to live lookup.cache.ttl. The cached rows might not be the latest, users can tune lookup.cache.ttl to a smaller value to have a better fresh data, but this may increase the number of requests send to database. So this is a balance between throughput and correctness.