从 Apache Beam 读取 CSV 并写入 BigQuery
Read CSV and write to BigQuery from Apache Beam
我有一个 GCS 存储桶,我试图从中读取大约 20 万个文件,然后将它们写入 BigQuery。问题是我在创建与代码配合良好的 PCollection 时遇到了问题。我正在关注 this 教程以供参考。
我有这个代码:
from __future__ import absolute_import
import argparse
import logging
import os
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage
import regex as re
# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
files = ['gs://mybucket/_chunk1',
'gs://mybucket/_chunk0']
class DataIngestion:
"""A helper class which contains the logic to translate the file into
a format BigQuery will accept."""
def parse_method(self, string_input):
x="""{}""".format(string_input)
rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")
d = {}
d['name'], d['date'], d['geometry'], d['value0'], d['value1'], d['value2']=rx.split(x)
d['geometry']=d['geometry'].strip('"')
return d
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
data_ingestion = DataIngestion()
p = beam.Pipeline(options=PipelineOptions())
(p
| 'Create PCollection' >> beam.Create(files)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)
| 'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
'mytable',
dataset='mydataset',
schema=myschema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
问题在于,如果 files
列表只有一个元素,则此代码可以完美运行。一旦有超过 1 个元素,转换 'String To BigQuery Row' 就会出错并显示 error: nothing to repeat [while running 'String To BigQuery Row']
。这可能与正则表达式模块有关,但我无法弄清楚哪里出了问题,因为它在给定 1 个文件时可以完美运行。
编辑:奇怪的是它通过 DirectRunner 运行良好。我正在传递给定的 requirements.txt
文件 here.
这就是我执行管道的方式:
python streaming_inserts.py --runner=DataFlowRunner --project=my-project --temp_location=gs://temp/ --staging_location=gs://stage/ --requirements_file requirements.txt --disk_size_gb 1000 --region us-east1
我的 requirements.txt
看起来像这样:
regex
google-cloud-storage
此外,根据日志,正在安装软件包:
OP 的评论让我意识到我的错误:预期的库是 regex
,而不是 python 的内置 re
.
使用 import regex as re
不仅让我感到困惑,而且还导致 re
库抛出 nothing to repeat
错误。这是因为默认情况下 Dataflow 不会保存您的主会话。
当您的解析函数中的代码正在执行时,它无法访问您在构建时导入的 re
的上下文。通常,这会因 NameError
而失败,但因为您使用的是有效的库名称,代码假定您指的是内置 re
库并尝试按原样执行它。
如果您改用 import regex
,您将看到 NameError: name 'regex' is not defined
,这是代码失败的真正原因。要解决这个问题,要么将 import 语句移动到解析函数本身,要么将 --save_main_session
作为选项传递给运行器。
有关详细信息,请参阅 here。
旧答案:
虽然我无法判断您使用的 Python 是哪个版本,但看来您对正则表达式的怀疑是正确的。
*
是一个特殊字符,表示重复前面的内容,但 (
是一个表示分组的特殊字符,因此像 (*SKIP)
这样的模式在语法上似乎不正确。
在 Python 3.7 中,上面的表达式甚至无法编译:
python -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 234, in compile
return _compile(pattern, flags)
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 286, in _compile
p = sre_compile.compile(pattern, flags)
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_compile.py", line 764, in compile
p = sre_parse.parse(p, flags)
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 930, in parse
p = _parse_sub(source, pattern, flags & SRE_FLAG_VERBOSE, 0)
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub
not nested and not items))
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 816, in _parse
p = _parse_sub(source, state, sub_verbose, nested + 1)
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub
not nested and not items))
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 651, in _parse
source.tell() - here + len(this))
re.error: nothing to repeat at position 11
Python 2.7.15也不接受:
python2 -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/lib/python2.7/re.py", line 194, in compile
return _compile(pattern, flags)
File "/usr/lib/python2.7/re.py", line 251, in _compile
raise error, v # invalid expression
sre_constants.error: nothing to repeat
虽然我不知道您要匹配的字符串是什么,但我怀疑您的某些字符需要转义。例如"\{[^{}]+\}(\*SKIP)(\*FAIL)|,"
我有一个 GCS 存储桶,我试图从中读取大约 20 万个文件,然后将它们写入 BigQuery。问题是我在创建与代码配合良好的 PCollection 时遇到了问题。我正在关注 this 教程以供参考。
我有这个代码:
from __future__ import absolute_import
import argparse
import logging
import os
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage
import regex as re
# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
files = ['gs://mybucket/_chunk1',
'gs://mybucket/_chunk0']
class DataIngestion:
"""A helper class which contains the logic to translate the file into
a format BigQuery will accept."""
def parse_method(self, string_input):
x="""{}""".format(string_input)
rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")
d = {}
d['name'], d['date'], d['geometry'], d['value0'], d['value1'], d['value2']=rx.split(x)
d['geometry']=d['geometry'].strip('"')
return d
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
data_ingestion = DataIngestion()
p = beam.Pipeline(options=PipelineOptions())
(p
| 'Create PCollection' >> beam.Create(files)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)
| 'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
'mytable',
dataset='mydataset',
schema=myschema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
问题在于,如果 files
列表只有一个元素,则此代码可以完美运行。一旦有超过 1 个元素,转换 'String To BigQuery Row' 就会出错并显示 error: nothing to repeat [while running 'String To BigQuery Row']
。这可能与正则表达式模块有关,但我无法弄清楚哪里出了问题,因为它在给定 1 个文件时可以完美运行。
编辑:奇怪的是它通过 DirectRunner 运行良好。我正在传递给定的 requirements.txt
文件 here.
这就是我执行管道的方式:
python streaming_inserts.py --runner=DataFlowRunner --project=my-project --temp_location=gs://temp/ --staging_location=gs://stage/ --requirements_file requirements.txt --disk_size_gb 1000 --region us-east1
我的 requirements.txt
看起来像这样:
regex
google-cloud-storage
此外,根据日志,正在安装软件包:
OP 的评论让我意识到我的错误:预期的库是 regex
,而不是 python 的内置 re
.
使用 import regex as re
不仅让我感到困惑,而且还导致 re
库抛出 nothing to repeat
错误。这是因为默认情况下 Dataflow 不会保存您的主会话。
当您的解析函数中的代码正在执行时,它无法访问您在构建时导入的 re
的上下文。通常,这会因 NameError
而失败,但因为您使用的是有效的库名称,代码假定您指的是内置 re
库并尝试按原样执行它。
如果您改用 import regex
,您将看到 NameError: name 'regex' is not defined
,这是代码失败的真正原因。要解决这个问题,要么将 import 语句移动到解析函数本身,要么将 --save_main_session
作为选项传递给运行器。
有关详细信息,请参阅 here。
旧答案:
虽然我无法判断您使用的 Python 是哪个版本,但看来您对正则表达式的怀疑是正确的。
*
是一个特殊字符,表示重复前面的内容,但 (
是一个表示分组的特殊字符,因此像 (*SKIP)
这样的模式在语法上似乎不正确。
在 Python 3.7 中,上面的表达式甚至无法编译:
python -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 234, in compile
return _compile(pattern, flags)
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 286, in _compile
p = sre_compile.compile(pattern, flags)
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_compile.py", line 764, in compile
p = sre_parse.parse(p, flags)
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 930, in parse
p = _parse_sub(source, pattern, flags & SRE_FLAG_VERBOSE, 0)
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub
not nested and not items))
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 816, in _parse
p = _parse_sub(source, state, sub_verbose, nested + 1)
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub
not nested and not items))
File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 651, in _parse
source.tell() - here + len(this))
re.error: nothing to repeat at position 11
Python 2.7.15也不接受:
python2 -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/lib/python2.7/re.py", line 194, in compile
return _compile(pattern, flags)
File "/usr/lib/python2.7/re.py", line 251, in _compile
raise error, v # invalid expression
sre_constants.error: nothing to repeat
虽然我不知道您要匹配的字符串是什么,但我怀疑您的某些字符需要转义。例如"\{[^{}]+\}(\*SKIP)(\*FAIL)|,"