为什么可迭代操作会在 Apache Beam 函数中抛出错误?

Why do iterable operations throw errors in Apache Beam functions?

我在 Iterable Java 集合上调用此函数,它来自 GroupByKey 函数:

static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
    String COMPLETE_EVENT_NAME = "COMPLETE";

    @ProcessElement
    public void processElement(ProcessContext c) {
        Iterable<Order> orders = c.element().getValue();
        boolean complete = false;

        do {
            try {
                Order order = orders.iterator().next();

                if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
                    complete = true;
                    order.setComplete(complete);
                    c.output(order);
                }
            } catch (Exception e) {
                LOG.error(e.getMessage());
            }
        } while (complete == false && orders.iterator().hasNext());
    }
}

函数迭代Orders的列表并输出匹配指定eventName属性的第一个实例。如果找到 Order 或迭代了整个集合,则循环结束。

随机 Order 实例在上游生成,并以每秒 2 个的速率发布到 Pub/Sub 实例,由从中调用此函数的 DataFlow 实例。约运行 15 分钟后,警告开始出现:

Processing stuck in step Find Order for at least 15m00s without outputting or completing

警告是由于 iterator().hasNext()iterator().next() 中的偶发故障而发出的。最终结果是整个流水线停滞。关联的管道阶段从不发出输出。

用标准 for 循环替换循环可以解决问题。然而,这样做意味着迭代整个集合;我更愿意在找到合适的元素时结束循环,因此是 do-while 循环。

我很想知道为什么 iterator 操作会导致管道停滞。 FAIA Iterable 集合是不可变的,不会被其他进程修改。

我是 运行 Java 8Apache Beam 2.6 Windows.

每次调用 orders.iterator() 时,您都会创建一个 new 迭代器,从第一个顺序开始。这意味着您在循环中一遍又一遍地处理相同的订单。如果有多个订单,您对 hasNext() 的调用将始终为真。因此,如果您有多个订单或您的第一个订单未设置 complete,则循环将永远 运行,这就是您超时的原因。

相反,您应该调用 iterator() 一次并存储迭代器而不是可迭代的,使用它来循环:

static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
    String COMPLETE_EVENT_NAME = "COMPLETE";

    @ProcessElement
    public void processElement(ProcessContext c) {
        Iterator<Order> orders = c.element().getValue().iterator();
        boolean complete = false;

        do {
            try {
                Order order = orders.next();

                if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
                    complete = true;
                    order.setComplete(complete);
                    c.output(order);
                }
            } catch (Exception e) {
                LOG.error(e.getMessage());
            }
        } while (complete == false && orders.hasNext());
    }
}