Flink 中的动态 SQL 查询
Dynamic SQL Query in Flink
我有一个 SQL 这样的查询
String ipdetailsSql = "select sid, _zpsbd6 as ip_address, ssresp, reason, " +
"SUM(CASE WHEN botcode='r1' THEN 1 ELSE 0 END ) as icf_count, " +
"SUM(CASE WHEN botcode='r2' THEN 1 ELSE 0 END ) as dc_count, " +
"SUM(CASE WHEN botcode='r5' THEN 1 ELSE 0 END ) as badua_count, " +
"COUNT(*) as hits, TUMBLE_START(ts, INTERVAL '1' MINUTE) AS fseen " +
"from sourceTopic " +
"GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), sid, _zpsbd6, ssresp, reason";
根据用户输入,我想将 botcode='r1' 更改为给定输入。在不重新启动作业的情况下说 botcode='r10' 。
有没有办法做到这一点。我在使用流环境的 flink 1.7 上。我尝试配置流来读取输入。
但坚持如何即时更改查询。谁能帮我这个?提前致谢
流SQL查询不是执行一次就完成的,而是连续计算的声明式表达。如果不使用新查询启动新作业,则无法对该计算进行任意更改。
不过,在简单的情况下,您可以做一些事情。您可能会考虑是否可以将您的源主题与另一个有效提供一些查询参数的流连接起来。或者,您可能会发现计算所有可能需要的结果然后 select 下游实际需要的结果是负担得起的。
我有一个 SQL 这样的查询
String ipdetailsSql = "select sid, _zpsbd6 as ip_address, ssresp, reason, " +
"SUM(CASE WHEN botcode='r1' THEN 1 ELSE 0 END ) as icf_count, " +
"SUM(CASE WHEN botcode='r2' THEN 1 ELSE 0 END ) as dc_count, " +
"SUM(CASE WHEN botcode='r5' THEN 1 ELSE 0 END ) as badua_count, " +
"COUNT(*) as hits, TUMBLE_START(ts, INTERVAL '1' MINUTE) AS fseen " +
"from sourceTopic " +
"GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), sid, _zpsbd6, ssresp, reason";
根据用户输入,我想将 botcode='r1' 更改为给定输入。在不重新启动作业的情况下说 botcode='r10' 。 有没有办法做到这一点。我在使用流环境的 flink 1.7 上。我尝试配置流来读取输入。 但坚持如何即时更改查询。谁能帮我这个?提前致谢
流SQL查询不是执行一次就完成的,而是连续计算的声明式表达。如果不使用新查询启动新作业,则无法对该计算进行任意更改。
不过,在简单的情况下,您可以做一些事情。您可能会考虑是否可以将您的源主题与另一个有效提供一些查询参数的流连接起来。或者,您可能会发现计算所有可能需要的结果然后 select 下游实际需要的结果是负担得起的。