Apache Flink:将 DataStream 写入 Postgres table
Apache Flink: Write a DataStream to a Postgres table
我正在尝试编写一个流作业,将数据流汇入 postgres table。为了提供完整的信息,我的工作基于以下文章:https://tech.signavio.com/2017/postgres-flink-sink 建议使用 JDBCOutputFormat。
我的代码如下所示:
98 ...
99 String strQuery = "INSERT INTO public.alarm (entity, duration, first, type, windowsize) VALUES (?, ?, ?, 'dur', 6)";
100
101 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
102 .setDrivername("org.postgresql.Driver")
103 .setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
104 .setQuery(strQuery)
105 .setSqlTypes(new int[] { Types.VARCHAR, Types.INTEGER, Types.VARCHAR}) //set the types
106 .finish();
107
108 DataStream<Row> rows = FilterStream
109 .map((tuple)-> {
110 Row row = new Row(3); // our prepared statement has 3 parameters
111 row.setField(0, tuple.f0); // first parameter is case ID
112 row.setField(1, tuple.f1); // second paramater is tracehash
113 row.setField(2, f.format(tuple.f2)); // third paramater is tracehash
114 return row;
115 });
116
117 rows.writeUsingOutputFormat(jdbcOutput);
118
119 env.execute();
120
121 }
122 }
我现在的问题是只有当我的工作停止时才会插入值(准确地说,当我从 apache flink 仪表板取消我的工作时)。
所以我的问题如下:我错过了什么吗?我应该在某处提交我插入的行吗?
此致,
伊格内修斯
正如 Chesnay 在 中所说,您必须调整批次间隔。
但这还不是全部。如果你想获得至少一次的结果,你必须将批量写入与 Flink 的检查点同步。基本上,您必须将 JdbcOutputFormat
包装在还实现了 CheckpointedFunction
接口的 SinkFunction
中。调用 snapshotState()
时,您已将批处理写入数据库。您可以查看此 pull request,它将在下一个版本中提供此功能。
Fabian的答案是实现至少一次语义的一种方法;通过将写入与 Flink 的检查点同步。但是,这有一个缺点,即您的 Sink 的数据新鲜度现在对您的检查点间隔周期很严格。
作为替代方案,您可以将具有 (entity, duration, first) 字段的元组或行存储在 Flink 自己的托管状态中,以便 Flink 负责为其设置检查点(换句话说,使您的 Sink 的状态具有容错性).为此,您需要实现 CheckpointedFunction 和 CheckpointedRestoring 接口(无需将您的写入与检查点同步。如果您不必使用 JDBCOutputFormat,您甚至可以单独执行 SQL 插入)。参见:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state。另一种解决方案是只实现 ListCheckpointed 接口(可以与已弃用的 CheckpointedRestoring 接口类似的方式使用,并支持列表样式的状态重新分配)。
我正在尝试编写一个流作业,将数据流汇入 postgres table。为了提供完整的信息,我的工作基于以下文章:https://tech.signavio.com/2017/postgres-flink-sink 建议使用 JDBCOutputFormat。
我的代码如下所示:
98 ...
99 String strQuery = "INSERT INTO public.alarm (entity, duration, first, type, windowsize) VALUES (?, ?, ?, 'dur', 6)";
100
101 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
102 .setDrivername("org.postgresql.Driver")
103 .setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
104 .setQuery(strQuery)
105 .setSqlTypes(new int[] { Types.VARCHAR, Types.INTEGER, Types.VARCHAR}) //set the types
106 .finish();
107
108 DataStream<Row> rows = FilterStream
109 .map((tuple)-> {
110 Row row = new Row(3); // our prepared statement has 3 parameters
111 row.setField(0, tuple.f0); // first parameter is case ID
112 row.setField(1, tuple.f1); // second paramater is tracehash
113 row.setField(2, f.format(tuple.f2)); // third paramater is tracehash
114 return row;
115 });
116
117 rows.writeUsingOutputFormat(jdbcOutput);
118
119 env.execute();
120
121 }
122 }
我现在的问题是只有当我的工作停止时才会插入值(准确地说,当我从 apache flink 仪表板取消我的工作时)。
所以我的问题如下:我错过了什么吗?我应该在某处提交我插入的行吗?
此致, 伊格内修斯
正如 Chesnay 在
但这还不是全部。如果你想获得至少一次的结果,你必须将批量写入与 Flink 的检查点同步。基本上,您必须将 JdbcOutputFormat
包装在还实现了 CheckpointedFunction
接口的 SinkFunction
中。调用 snapshotState()
时,您已将批处理写入数据库。您可以查看此 pull request,它将在下一个版本中提供此功能。
Fabian的答案是实现至少一次语义的一种方法;通过将写入与 Flink 的检查点同步。但是,这有一个缺点,即您的 Sink 的数据新鲜度现在对您的检查点间隔周期很严格。
作为替代方案,您可以将具有 (entity, duration, first) 字段的元组或行存储在 Flink 自己的托管状态中,以便 Flink 负责为其设置检查点(换句话说,使您的 Sink 的状态具有容错性).为此,您需要实现 CheckpointedFunction 和 CheckpointedRestoring 接口(无需将您的写入与检查点同步。如果您不必使用 JDBCOutputFormat,您甚至可以单独执行 SQL 插入)。参见:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state。另一种解决方案是只实现 ListCheckpointed 接口(可以与已弃用的 CheckpointedRestoring 接口类似的方式使用,并支持列表样式的状态重新分配)。