如何在单元测试 Apache Beam 时正确测试 pcollection 长度

How to properly test pcollection length when unit testing Apache Beam

我想知道哪种方法是测试检查光束管道产生的输出长度的最佳方法。

我有一些这样的测试代码:

test_data = [
    {'kind': 'storage#object', 'name': 'file1.doc', 'contentType': 'application/octet-stream', 'bucket': 'bucket123' },
    {'kind': 'storage#object', 'name': 'file2.pdf', 'contentType': 'application/pdf','bucket': 'bucket234'},
    {'kind': 'storage#object', 'name': 'file3.msg', 'contentType': 'message/rfc822', 'bucket': 'bucket345'}
]

with TestPipeline() as p:
   output = (p 
             | beam.Create(test_data)
             | beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
   )

我想测试确保 test_data 列表中的所有元素都转到 'output.ok'。 我认为这样做的方法是像这样计算它们:

with TestPipeline() as p:
   output = (p 
             | beam.Create(testdata) 
             | beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
   )

   okay_count = (output.ok | beam.Map(lambda x: ('dummy_key',x)) 
                 | beam.GroupByKey()  # This gets ('dumm_key',[element1,element2....])
                 | beam.Map(lambda x: len(x[1]) )  # Drop the key and get the lengh of the list
   )

   # And finally check^H^H^H^H^H^H assert the count is correct:
   assert_that(okay_count, equal_to([len(test_data)])

这行得通;但我觉得这不是最好的方法,而且我相信还有更多方法可以做到。

最佳选择(目前为止)

这是迄今为止建议的最佳选项:使用 beam.combiners.Count.Globally()

with TestPipeline() as p:
   output = (p 
             | beam.Create(testdata) 
             | beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
   )

   okay_count = output | beam.combiners.Count.Globally()
   assert_that(okay_count, equal_to([len(test_data)])

你在问题中回答了你自己的问题。写在这里作为答案:

with TestPipeline() as p:
   output = (p 
             | beam.Create(testdata) 
             | beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
   )

   okay_count = output | beam.combiners.Count.Globally()
   assert_that(okay_count, equal_to([len(test_data)])