如何在 Google Dataflow Runner 上将 Apache Beam Pipeline 中的大型内存数据处理到 运行
How to handle large in-memory data in Apache Beam Pipeline to run on Google Dataflow Runner
我有一个简单的以下代码。内存中变量 word_to_id
的大小约为 50MB。这会导致将管道提交到 Dataflow Runner 时出错。
413 Request Entity Too Large
word_to_id = {tok: idx for idx, tok in enumerate(vocab)}
def extract_word_ids(tokens):
return [word_to_id[w] for w in tokens if word_to_id.get(w, None)]
with beam.pipeline.Pipeline(
options=get_pipeline_option()) as p:
lines = p | 'Read' >> beam.io.ReadFromText(path)
word_ids = (
lines
| 'TokenizeLines' >> beam.Map(words)
| 'IntergerizeTokens' >> beam.Map(extract_word_ids)
)
请为此提供替代解决方案。
您可以使用 GCS 存储桶作为文本和变量的来源,并将变量用作 side input
。您可以将此侧输入用作列表、字典或单例。
这里有一个 wordcount 删除停用词的示例,这些停用词存储在 GCS 存储桶中
with beam.Pipeline() as p:
path = "gs://dataflow-samples/shakespeare/kinglear.txt"
stopwords_path = "<BUCKET/stopwords>"
output_path = "<BUCKET>"
def split_words(text, stopwords):
words = re.split('\W+', text)
try:
words.remove('')
except:
pass
return [x for x in words if x.lower() not in stopwords]
stopwords_p = (p | "Read Stop Words" >> ReadFromText(stopwords_path)
| FlatMap(lambda x: x.split(", ")))
text = p | "Read Text" >> ReadFromText(path)
(text | "Split Words" >> FlatMap(split_words, stopwords=beam.pvalue.AsList(stopwords_p))
| "Count" >> Count.PerElement()
| "Write" >> WriteToText(file_path_prefix=output_path, file_name_suffix=".txt"))
最后,我设法解决了它并且成功了。我使用 DoFn.setup
从 GCS 存储桶初始化我的变量。
class IntergerizeTokens(beam.DoFn):
"""Beam line processing function."""
def __init__(self, vocab_filename):
self.vocab_filename = vocab_filename
def setup(self):
with tf.io.gfile.GFile(tf.io.gfile.glob(self.vocab_filename + '*')[0], 'r') as fh:
# read from GCS bucket
self.word_to_id = {tok: idx for idx, tok in enumerate(vocab)}
print('Setup done!')
def process(self, tokens):
"""Takes a lines and yield a list of (token, 1) tuples."""
return [[self.word_to_id[w] for w in tokens if self.word_to_id.get(w, None)]]
Now pass the DoFn
in ParDo
with beam.pipeline.Pipeline(
options=get_pipeline_option()) as p:
lines = p | 'Read' >> beam.io.ReadFromText(path)
word_ids = (
lines
| 'TokenizeLines' >> beam.Map(words)
| 'IntergerizeTokens' >> beam.ParDo(IntergerizeTokens(vocab_temp_path))
)
这是解决问题的一种方法。我认为 DoFn.setup
适合在内存中初始化大变量。
我有一个简单的以下代码。内存中变量 word_to_id
的大小约为 50MB。这会导致将管道提交到 Dataflow Runner 时出错。
413 Request Entity Too Large
word_to_id = {tok: idx for idx, tok in enumerate(vocab)}
def extract_word_ids(tokens):
return [word_to_id[w] for w in tokens if word_to_id.get(w, None)]
with beam.pipeline.Pipeline(
options=get_pipeline_option()) as p:
lines = p | 'Read' >> beam.io.ReadFromText(path)
word_ids = (
lines
| 'TokenizeLines' >> beam.Map(words)
| 'IntergerizeTokens' >> beam.Map(extract_word_ids)
)
请为此提供替代解决方案。
您可以使用 GCS 存储桶作为文本和变量的来源,并将变量用作 side input
。您可以将此侧输入用作列表、字典或单例。
这里有一个 wordcount 删除停用词的示例,这些停用词存储在 GCS 存储桶中
with beam.Pipeline() as p:
path = "gs://dataflow-samples/shakespeare/kinglear.txt"
stopwords_path = "<BUCKET/stopwords>"
output_path = "<BUCKET>"
def split_words(text, stopwords):
words = re.split('\W+', text)
try:
words.remove('')
except:
pass
return [x for x in words if x.lower() not in stopwords]
stopwords_p = (p | "Read Stop Words" >> ReadFromText(stopwords_path)
| FlatMap(lambda x: x.split(", ")))
text = p | "Read Text" >> ReadFromText(path)
(text | "Split Words" >> FlatMap(split_words, stopwords=beam.pvalue.AsList(stopwords_p))
| "Count" >> Count.PerElement()
| "Write" >> WriteToText(file_path_prefix=output_path, file_name_suffix=".txt"))
最后,我设法解决了它并且成功了。我使用 DoFn.setup
从 GCS 存储桶初始化我的变量。
class IntergerizeTokens(beam.DoFn):
"""Beam line processing function."""
def __init__(self, vocab_filename):
self.vocab_filename = vocab_filename
def setup(self):
with tf.io.gfile.GFile(tf.io.gfile.glob(self.vocab_filename + '*')[0], 'r') as fh:
# read from GCS bucket
self.word_to_id = {tok: idx for idx, tok in enumerate(vocab)}
print('Setup done!')
def process(self, tokens):
"""Takes a lines and yield a list of (token, 1) tuples."""
return [[self.word_to_id[w] for w in tokens if self.word_to_id.get(w, None)]]
Now pass the
DoFn
inParDo
with beam.pipeline.Pipeline(
options=get_pipeline_option()) as p:
lines = p | 'Read' >> beam.io.ReadFromText(path)
word_ids = (
lines
| 'TokenizeLines' >> beam.Map(words)
| 'IntergerizeTokens' >> beam.ParDo(IntergerizeTokens(vocab_temp_path))
)
这是解决问题的一种方法。我认为 DoFn.setup
适合在内存中初始化大变量。