为什么可迭代操作会在 Apache Beam 函数中抛出错误?
Why do iterable operations throw errors in Apache Beam functions?
我在 Iterable
Java 集合上调用此函数,它来自 GroupByKey
函数:
static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
String COMPLETE_EVENT_NAME = "COMPLETE";
@ProcessElement
public void processElement(ProcessContext c) {
Iterable<Order> orders = c.element().getValue();
boolean complete = false;
do {
try {
Order order = orders.iterator().next();
if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
complete = true;
order.setComplete(complete);
c.output(order);
}
} catch (Exception e) {
LOG.error(e.getMessage());
}
} while (complete == false && orders.iterator().hasNext());
}
}
函数迭代Orders
的列表并输出匹配指定eventName
属性的第一个实例。如果找到 Order
或迭代了整个集合,则循环结束。
随机 Order
实例在上游生成,并以每秒 2 个的速率发布到 Pub/Sub 实例,由从中调用此函数的 DataFlow 实例。约运行 15 分钟后,警告开始出现:
Processing stuck in step Find Order for at least 15m00s without outputting or completing
警告是由于 iterator().hasNext()
或 iterator().next()
中的偶发故障而发出的。最终结果是整个流水线停滞。关联的管道阶段从不发出输出。
用标准 for 循环替换循环可以解决问题。然而,这样做意味着迭代整个集合;我更愿意在找到合适的元素时结束循环,因此是 do-while 循环。
我很想知道为什么 iterator
操作会导致管道停滞。 FAIA Iterable
集合是不可变的,不会被其他进程修改。
我是 运行 Java 8 和 Apache Beam 2.6 Windows.
每次调用 orders.iterator()
时,您都会创建一个 new 迭代器,从第一个顺序开始。这意味着您在循环中一遍又一遍地处理相同的订单。如果有多个订单,您对 hasNext()
的调用将始终为真。因此,如果您有多个订单或您的第一个订单未设置 complete
,则循环将永远 运行,这就是您超时的原因。
相反,您应该调用 iterator()
一次并存储迭代器而不是可迭代的,使用它来循环:
static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
String COMPLETE_EVENT_NAME = "COMPLETE";
@ProcessElement
public void processElement(ProcessContext c) {
Iterator<Order> orders = c.element().getValue().iterator();
boolean complete = false;
do {
try {
Order order = orders.next();
if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
complete = true;
order.setComplete(complete);
c.output(order);
}
} catch (Exception e) {
LOG.error(e.getMessage());
}
} while (complete == false && orders.hasNext());
}
}
我在 Iterable
Java 集合上调用此函数,它来自 GroupByKey
函数:
static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
String COMPLETE_EVENT_NAME = "COMPLETE";
@ProcessElement
public void processElement(ProcessContext c) {
Iterable<Order> orders = c.element().getValue();
boolean complete = false;
do {
try {
Order order = orders.iterator().next();
if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
complete = true;
order.setComplete(complete);
c.output(order);
}
} catch (Exception e) {
LOG.error(e.getMessage());
}
} while (complete == false && orders.iterator().hasNext());
}
}
函数迭代Orders
的列表并输出匹配指定eventName
属性的第一个实例。如果找到 Order
或迭代了整个集合,则循环结束。
随机 Order
实例在上游生成,并以每秒 2 个的速率发布到 Pub/Sub 实例,由从中调用此函数的 DataFlow 实例。约运行 15 分钟后,警告开始出现:
Processing stuck in step Find Order for at least 15m00s without outputting or completing
警告是由于 iterator().hasNext()
或 iterator().next()
中的偶发故障而发出的。最终结果是整个流水线停滞。关联的管道阶段从不发出输出。
用标准 for 循环替换循环可以解决问题。然而,这样做意味着迭代整个集合;我更愿意在找到合适的元素时结束循环,因此是 do-while 循环。
我很想知道为什么 iterator
操作会导致管道停滞。 FAIA Iterable
集合是不可变的,不会被其他进程修改。
我是 运行 Java 8 和 Apache Beam 2.6 Windows.
每次调用 orders.iterator()
时,您都会创建一个 new 迭代器,从第一个顺序开始。这意味着您在循环中一遍又一遍地处理相同的订单。如果有多个订单,您对 hasNext()
的调用将始终为真。因此,如果您有多个订单或您的第一个订单未设置 complete
,则循环将永远 运行,这就是您超时的原因。
相反,您应该调用 iterator()
一次并存储迭代器而不是可迭代的,使用它来循环:
static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
String COMPLETE_EVENT_NAME = "COMPLETE";
@ProcessElement
public void processElement(ProcessContext c) {
Iterator<Order> orders = c.element().getValue().iterator();
boolean complete = false;
do {
try {
Order order = orders.next();
if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
complete = true;
order.setComplete(complete);
c.output(order);
}
} catch (Exception e) {
LOG.error(e.getMessage());
}
} while (complete == false && orders.hasNext());
}
}