DoFn 的处理函数未执行
process function of DoFn not executing
我正在尝试编写一个光束变换,例如
util.py
class GroupIntoBatches(PTransform):
def __init__(self, batch_size):
self.batch_size = batch_size
@staticmethod
def of_size(batch_size):
return GroupIntoBatches(batch_size)
def expand(self, pcoll):
input_coder = coders.registry.get_coder(pcoll)
if not input_coder.is_kv_coder():
raise ValueError(
'coder specified in the input PCollection is not a KvCoder')
key_coder = input_coder.key_coder()
value_coder = input_coder.value_coder()
return pcoll | ParDo(_GroupIntoBatchesDoFn(self.batch_size, key_coder, value_coder))
class _GroupIntoBatchesDoFn(DoFn):
def __init__(self, batch_size, input_key_coder, input_value_coder):
self.batch_size = batch_size
self.batch_spec = BagStateSpec("GroupIntoBatches", input_value_coder)
def process(self, element):
raise Exception("Not getting to this point") # This is not working
print element
尝试通过测试用例执行此转换
util_test.py
class GroupIntoBatchesTest(unittest.TestCase):
NUM_ELEMENTS = 10
BATCH_SIZE = 5
@staticmethod
def _create_test_data():
scientists = [
"Einstein",
"Darwin",
"Copernicus",
"Pasteur",
"Curie",
"Faraday",
"Newton",
"Bohr",
"Galilei",
"Maxwell"
]
data = []
for i in range(GroupIntoBatchesTest.NUM_ELEMENTS):
index = i % len(scientists)
data.append(("key", scientists[index]))
return data
def test_in_global_window(self):
pipeline = TestPipeline()
collection = pipeline | beam.Create(GroupIntoBatchesTest._create_test_data()) | util.GroupIntoBatches.of_size(GroupIntoBatchesTest.BATCH_SIZE)
我的问题是 process
函数没有在我的 _GroupIntoBatchesDoFn
上被调用的原因是什么
我在 运行 我的测试用例
上得到了这个结果
test_in_global_window
(apache_beam.transforms.util_test.GroupIntoBatchesTest) ... ok
您的测试正在构建管道,但并未实际执行它。你需要写
pipeline = TestPipeline()
collection = pipeline | ...
pipeline.run()
或者,或者,
with TestPipeline() as pipeline:
collection = pipeline | ...
# run is implicitly called on exit of the with block
(您可能还对 BatchElements 变换感兴趣。)
我正在尝试编写一个光束变换,例如
util.py
class GroupIntoBatches(PTransform):
def __init__(self, batch_size):
self.batch_size = batch_size
@staticmethod
def of_size(batch_size):
return GroupIntoBatches(batch_size)
def expand(self, pcoll):
input_coder = coders.registry.get_coder(pcoll)
if not input_coder.is_kv_coder():
raise ValueError(
'coder specified in the input PCollection is not a KvCoder')
key_coder = input_coder.key_coder()
value_coder = input_coder.value_coder()
return pcoll | ParDo(_GroupIntoBatchesDoFn(self.batch_size, key_coder, value_coder))
class _GroupIntoBatchesDoFn(DoFn):
def __init__(self, batch_size, input_key_coder, input_value_coder):
self.batch_size = batch_size
self.batch_spec = BagStateSpec("GroupIntoBatches", input_value_coder)
def process(self, element):
raise Exception("Not getting to this point") # This is not working
print element
尝试通过测试用例执行此转换
util_test.py
class GroupIntoBatchesTest(unittest.TestCase):
NUM_ELEMENTS = 10
BATCH_SIZE = 5
@staticmethod
def _create_test_data():
scientists = [
"Einstein",
"Darwin",
"Copernicus",
"Pasteur",
"Curie",
"Faraday",
"Newton",
"Bohr",
"Galilei",
"Maxwell"
]
data = []
for i in range(GroupIntoBatchesTest.NUM_ELEMENTS):
index = i % len(scientists)
data.append(("key", scientists[index]))
return data
def test_in_global_window(self):
pipeline = TestPipeline()
collection = pipeline | beam.Create(GroupIntoBatchesTest._create_test_data()) | util.GroupIntoBatches.of_size(GroupIntoBatchesTest.BATCH_SIZE)
我的问题是 process
函数没有在我的 _GroupIntoBatchesDoFn
我在 运行 我的测试用例
上得到了这个结果test_in_global_window (apache_beam.transforms.util_test.GroupIntoBatchesTest) ... ok
您的测试正在构建管道,但并未实际执行它。你需要写
pipeline = TestPipeline()
collection = pipeline | ...
pipeline.run()
或者,或者,
with TestPipeline() as pipeline:
collection = pipeline | ...
# run is implicitly called on exit of the with block
(您可能还对 BatchElements 变换感兴趣。)