读取缓慢变化查找和丰富流式输入 collection 的最佳方法是什么?
Which is the best approach to read a slowing change lookup and enrich a streaming input collection?
我正在使用 Apache Beam,流式传输 collection 为 1.5GB。
我的查找 table 是一个 JDBCio mysql 响应。
当我 运行 没有侧输入的管道时,我的工作将在大约 2 分钟内完成。当我 运行 我的工作与侧面输入时,我的工作永远不会完成,卡住并死掉。
这是我用来存储查询的代码(约 1M 条记录)
PCollectionView<Map<String,String>> sideData = pipeline.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://ip")
.withUsername("username")
.withPassword("password"))
.withQuery("select a_number from cell")
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getString(1), resultSet.getString(1));
}
})).apply(View.asMap());
这是我的流媒体代码collection
pipeline
.apply("ReadMyFile", TextIO.read().from("/home/data/**")
.watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never()))
.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
.accumulatingFiredPanes()
.withAllowedLateness(ONE_DAY))
这是我的 parDo 代码,用于迭代每个事件行(1000 万条记录)
.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String,Integer> i = c.element();
String sideInputData = c.sideInput(sideData).get(i.getKey());
if (sideInputData == null) {
c.output(i);
}
}
}).withSideInputs(sideData));
我正在使用 flink 集群,但使用直接 运行ner 输出相同。
集群:
2 cpu
6芯
24GB 内存
我做错了什么?
I've followed this
如果它以更少的数据运行,我怀疑程序正在用完 java 进程的所有内存。您可以通过 JVisualVM 或 JConsole 对其进行监控。有很多文章涵盖了这个问题,我只是通过快速 google 搜索偶然发现了 this one。
如果内存用完,您的 java 进程主要忙于清理内存,您会看到性能大幅下降。在某个时候,java 放弃并失败了。
要解决此问题,增加 java 堆大小应该就足够了。如何增加取决于执行它的方式和位置。查看 Java 的 -Xmx
参数或 beam 中的一些堆选项。
解决方案是创建一个 "cache" MAP。
sideInput只触发一次,然后我把它缓存到一个地图等效结构中。
因此,我避免为每个 processElement 执行 sideInput。
.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (isFirstTime) {
myList = c.sideInput(sideData);
}
isFirstTime = false;
boolean result = myList.containsKey(c.element().getKey());
if (result == false) {
c.output(i);
}
}
}).withSideInputs(sideData));
我正在使用 Apache Beam,流式传输 collection 为 1.5GB。 我的查找 table 是一个 JDBCio mysql 响应。
当我 运行 没有侧输入的管道时,我的工作将在大约 2 分钟内完成。当我 运行 我的工作与侧面输入时,我的工作永远不会完成,卡住并死掉。
这是我用来存储查询的代码(约 1M 条记录)
PCollectionView<Map<String,String>> sideData = pipeline.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://ip")
.withUsername("username")
.withPassword("password"))
.withQuery("select a_number from cell")
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getString(1), resultSet.getString(1));
}
})).apply(View.asMap());
这是我的流媒体代码collection
pipeline
.apply("ReadMyFile", TextIO.read().from("/home/data/**")
.watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never()))
.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
.accumulatingFiredPanes()
.withAllowedLateness(ONE_DAY))
这是我的 parDo 代码,用于迭代每个事件行(1000 万条记录)
.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String,Integer> i = c.element();
String sideInputData = c.sideInput(sideData).get(i.getKey());
if (sideInputData == null) {
c.output(i);
}
}
}).withSideInputs(sideData));
我正在使用 flink 集群,但使用直接 运行ner 输出相同。
集群:
2 cpu 6芯 24GB 内存
我做错了什么? I've followed this
如果它以更少的数据运行,我怀疑程序正在用完 java 进程的所有内存。您可以通过 JVisualVM 或 JConsole 对其进行监控。有很多文章涵盖了这个问题,我只是通过快速 google 搜索偶然发现了 this one。
如果内存用完,您的 java 进程主要忙于清理内存,您会看到性能大幅下降。在某个时候,java 放弃并失败了。
要解决此问题,增加 java 堆大小应该就足够了。如何增加取决于执行它的方式和位置。查看 Java 的 -Xmx
参数或 beam 中的一些堆选项。
解决方案是创建一个 "cache" MAP。
sideInput只触发一次,然后我把它缓存到一个地图等效结构中。
因此,我避免为每个 processElement 执行 sideInput。
.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (isFirstTime) {
myList = c.sideInput(sideData);
}
isFirstTime = false;
boolean result = myList.containsKey(c.element().getKey());
if (result == false) {
c.output(i);
}
}
}).withSideInputs(sideData));