Apache beam 列表到 PCollection
Apache beam list to PCollection
我的输入是 json 的列表,我想要一个多元素 PCollection。这是我的代码:
def parse_json(data):
import json
for i in json.loads(data):
return i
data = (p
| "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/not_processed/2020-06-08T23:59:59.999Z__rms004_m1__not_sent_msg.txt')
| "Parse json" >> beam.Map(parse_json))
当列表由 2 个元素组成时,我只得到列表的第一个元素。
如何实现?
我知道了。
Apache Beam 中有一个名为 ParDo 的函数就是为了这个。
def parse_json(data):
import json
return json.loads(data)
data = (p
| "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/not_processed/2020-06-08T23:59:59.999Z__rms004_m1__not_sent_msg.txt')
| "Parse json" >> beam.ParDo(parse_json))
我的输入是 json 的列表,我想要一个多元素 PCollection。这是我的代码:
def parse_json(data):
import json
for i in json.loads(data):
return i
data = (p
| "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/not_processed/2020-06-08T23:59:59.999Z__rms004_m1__not_sent_msg.txt')
| "Parse json" >> beam.Map(parse_json))
当列表由 2 个元素组成时,我只得到列表的第一个元素。
如何实现?
我知道了。
Apache Beam 中有一个名为 ParDo 的函数就是为了这个。
def parse_json(data):
import json
return json.loads(data)
data = (p
| "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/not_processed/2020-06-08T23:59:59.999Z__rms004_m1__not_sent_msg.txt')
| "Parse json" >> beam.ParDo(parse_json))