flink:它正在丢失记录吗?
flink: is it losing records?
我的拓扑结构是这样的:kafka(p:6)->reduce(p:6)->db writer(p:12)
(其中 p: 是并行度)。
- 我 运行 在单个节点 "cluster" 上
taskmanager.numberOfTaskSlots: 30
- 我知道我的 kafka 源每分钟产生约 650 万条记录
- kafka 'reader' 的并行度等于 kafka 分区的数量
当我观察这项工作(通过 flink UI)约 1 分钟时,这些是我看到的值:
- kafka -> 减少:已发送约 150 万条记录(减少 > 4 倍)
- 减少(窗口聚合 5 秒)-> 数据库写入 ~114K 记录已发送(关闭 > 2x)1
- db write --> records received: ~23K (off by > 5x) 2
(其他部分的发送/接收值之间存在较小的差异,但我可以将这些归因于测量误差)
问题:
1. 剩下的记录在哪里?
2. 这台机器上的负载永远不会超过 1.5,而这是 运行。还有其他限制因素吗?
3. 我是否误读了 UI 中的值?
Java 8
Flink 1.0(最新github)
机器:32 核 / 96 Gb RAM
1这个可以用聚合过程来解释
2这个值与写入数据库的内容一致。
Flink 不会丢失记录,它们只是在飞行中被缓冲,或者在 Kafka 中停留更长时间。从数字来看,您似乎遇到了 背压。
你可以看到 "reducer" 发出了很多 "db writer" 还没有收到的记录。在那种情况下,这些记录仍在操作员之间的通信通道的缓冲区中。这些通道的缓冲量有限(取决于配置的缓冲区数量,通常为几 MB)。对于小记录,它们可能会保存多个 10k 记录。
如果一个运营商发送的记录数量持续明显落后于接收运营商接收的记录数量,这表明接收方(此处为 "db writer")无法跟上数据速率.可能是因为数据库处理插入的速度不够快(太同步,太细粒度提交?),也许 "db writer" 和数据库之间的网络已经饱和。
在这种情况下,"db writer" 将背压减速器,最终也会背压 Kafka 源。
要尝试在没有数据库背压的情况下数据速率是多少,您可以尝试一个 "db writer" 简单地删除所有记录的实验。
我的拓扑结构是这样的:kafka(p:6)->reduce(p:6)->db writer(p:12)
(其中 p: 是并行度)。
- 我 运行 在单个节点 "cluster" 上
taskmanager.numberOfTaskSlots: 30
- 我知道我的 kafka 源每分钟产生约 650 万条记录
- kafka 'reader' 的并行度等于 kafka 分区的数量
当我观察这项工作(通过 flink UI)约 1 分钟时,这些是我看到的值:
- kafka -> 减少:已发送约 150 万条记录(减少 > 4 倍)
- 减少(窗口聚合 5 秒)-> 数据库写入 ~114K 记录已发送(关闭 > 2x)1
- db write --> records received: ~23K (off by > 5x) 2
(其他部分的发送/接收值之间存在较小的差异,但我可以将这些归因于测量误差)
问题:
1. 剩下的记录在哪里?
2. 这台机器上的负载永远不会超过 1.5,而这是 运行。还有其他限制因素吗?
3. 我是否误读了 UI 中的值?
Java 8
Flink 1.0(最新github)
机器:32 核 / 96 Gb RAM
1这个可以用聚合过程来解释
2这个值与写入数据库的内容一致。
Flink 不会丢失记录,它们只是在飞行中被缓冲,或者在 Kafka 中停留更长时间。从数字来看,您似乎遇到了 背压。
你可以看到 "reducer" 发出了很多 "db writer" 还没有收到的记录。在那种情况下,这些记录仍在操作员之间的通信通道的缓冲区中。这些通道的缓冲量有限(取决于配置的缓冲区数量,通常为几 MB)。对于小记录,它们可能会保存多个 10k 记录。
如果一个运营商发送的记录数量持续明显落后于接收运营商接收的记录数量,这表明接收方(此处为 "db writer")无法跟上数据速率.可能是因为数据库处理插入的速度不够快(太同步,太细粒度提交?),也许 "db writer" 和数据库之间的网络已经饱和。
在这种情况下,"db writer" 将背压减速器,最终也会背压 Kafka 源。
要尝试在没有数据库背压的情况下数据速率是多少,您可以尝试一个 "db writer" 简单地删除所有记录的实验。