从 CSV 文件读取并上传到 Google 数据存储 Python
Read from CSV file and upload to Google Data Store Python
我的要求是从 csv 文件中读取数据以及 header 并使用 Python 和 Dataflow 在 Google Dat a Store 中创建相同的结构。我曾尝试创建如下示例代码。
我的 CSV 样本如下,
First Name,Last Name,Date of Birth
Tom,Cruise,"July 3, 1962"
Bruce,Willis,"March 19, 1955"
Morgan,Freeman,"June 1, 1937"
John,Wayne,"May 26, 1907"
我的pyhton 2.7代码片段如下
import csv
import datetime
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper as datastore_helper
from apache_beam.io.filesystems import FileSystems
from apache_beam import pvalue
class CSVtoDict(beam.DoFn):
"""Converts line into dictionary"""
def process(self, element, header):
rec = ""
element = element.encode('utf-8')
try:
for line in csv.reader([element]):
rec = line
if len(rec) == len(header):
data = {header.strip(): val.strip() for header, val in zip(header, rec)}
return [data]
else:
logging.info("row contains bad data")
except Exception:
pass
class CreateEntities(beam.DoFn):
"""Creates Datastore entity"""
def process(self, element):
entity = entity_pb2.Entity()
sku = int(element.pop('sku'))
element[1] = float(element[1])
element['salePrice'] = float(element['salePrice'])
element['name'] = unicode(element['name'].decode('utf-8'))
element['type'] = unicode(element['type'].decode('utf-8'))
element['url'] = unicode(element['url'].decode('utf-8'))
element['image'] = unicode(element['image'].decode('utf-8'))
element['inStoreAvailability'] = unicode(element['inStoreAvailability'])
datastore_helper.add_key_path(entity.key, 'Productx', sku)
datastore_helper.add_properties(entity, element)
return [entity]
class ProcessOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
dest='input',
type=str,
required=False,
help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')
def read_header_from_filename(filename):
# note that depending on your newline character/file encoding, this may need to be modified
file_handle = FileSystems.open(filename)
header = file_handle.readline()
return header.split(',')
process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
# Create PCollection containing header line
header = (p
| beam.Create(process_options.input)
| beam.Map(read_header_from_filename))
def dataflow(argv=None):
process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
(p
| 'Reading input file' >> beam.io.ReadFromText(process_options.input)
| 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
)
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
dataflow()
我可以使用数据流加载实体,但是我想从 Header 解析 CSV 文件 ,然后解析行 ,而不是对 [= 中的值进行硬编码38=] 创建实体并在数据存储实体中写入相同的内容。
基本上上传相同的 CSV 文件,该文件作为包含行的数据流作业的输入。有人可以帮忙吗?
Required Output in Data Store for Key Actor:
First Name Last Name Date of Birth
Tom,Cruise "July 3, 1962"
Bruce,Willis "March 19, 1955"
Morgan,Freeman "June 1, 1937"
John,Wayne "May 26, 1907"
编辑:我合并了你给出的代码,现在出现以下错误。我正在使用 Python 2.7 并导入了相应的 libraries.Sorry 我是 Python.
的新手
Error:
File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
"__main__", fname, loader, pkg_name)
File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/home/gurusankar_p/upload-data-datastore-dataflow/upload2.py", line 70, in <module>
| beam.Map(read_header_from_filename))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/core.py", line 2423, in __init__
self.values = tuple(values)
TypeError: 'RuntimeValueProvider' object is not iterable
谢谢,GS
Apache Beam 通过将文件的读取拆分到多个工作线程来并行处理您的数据,这意味着大多数工作线程根本不会读取 header 行。
您要做的是加入 用header 行读取的行。由于 header 行是少量数据,您可以将其作为单独的 PCollection 读入,并将其作为侧输入传递给 CSVtoDict.
阅读您的 header 行的一些示例代码:
def read_header_from_filename(filename):
# note that depending on your newline character/file encoding, this may need to be modified
file_handle = FileSystems.open(filename)
header = file_handle.readline()
return header.split(',')
# Create PCollection containing header line
header = (p
| beam.Create(process_options.input)
| beam.Map(read_header_from_filename))
您的管道构造代码变为:
(p
| 'Reading input file' >> beam.io.ReadFromText(process_options.input)
| 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
)
p.run().wait_until_finish()
我的要求是从 csv 文件中读取数据以及 header 并使用 Python 和 Dataflow 在 Google Dat a Store 中创建相同的结构。我曾尝试创建如下示例代码。
我的 CSV 样本如下,
First Name,Last Name,Date of Birth
Tom,Cruise,"July 3, 1962"
Bruce,Willis,"March 19, 1955"
Morgan,Freeman,"June 1, 1937"
John,Wayne,"May 26, 1907"
我的pyhton 2.7代码片段如下
import csv
import datetime
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper as datastore_helper
from apache_beam.io.filesystems import FileSystems
from apache_beam import pvalue
class CSVtoDict(beam.DoFn):
"""Converts line into dictionary"""
def process(self, element, header):
rec = ""
element = element.encode('utf-8')
try:
for line in csv.reader([element]):
rec = line
if len(rec) == len(header):
data = {header.strip(): val.strip() for header, val in zip(header, rec)}
return [data]
else:
logging.info("row contains bad data")
except Exception:
pass
class CreateEntities(beam.DoFn):
"""Creates Datastore entity"""
def process(self, element):
entity = entity_pb2.Entity()
sku = int(element.pop('sku'))
element[1] = float(element[1])
element['salePrice'] = float(element['salePrice'])
element['name'] = unicode(element['name'].decode('utf-8'))
element['type'] = unicode(element['type'].decode('utf-8'))
element['url'] = unicode(element['url'].decode('utf-8'))
element['image'] = unicode(element['image'].decode('utf-8'))
element['inStoreAvailability'] = unicode(element['inStoreAvailability'])
datastore_helper.add_key_path(entity.key, 'Productx', sku)
datastore_helper.add_properties(entity, element)
return [entity]
class ProcessOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
dest='input',
type=str,
required=False,
help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')
def read_header_from_filename(filename):
# note that depending on your newline character/file encoding, this may need to be modified
file_handle = FileSystems.open(filename)
header = file_handle.readline()
return header.split(',')
process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
# Create PCollection containing header line
header = (p
| beam.Create(process_options.input)
| beam.Map(read_header_from_filename))
def dataflow(argv=None):
process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
(p
| 'Reading input file' >> beam.io.ReadFromText(process_options.input)
| 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
)
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
dataflow()
我可以使用数据流加载实体,但是我想从 Header 解析 CSV 文件 ,然后解析行 ,而不是对 [= 中的值进行硬编码38=] 创建实体并在数据存储实体中写入相同的内容。
基本上上传相同的 CSV 文件,该文件作为包含行的数据流作业的输入。有人可以帮忙吗?
Required Output in Data Store for Key Actor:
First Name Last Name Date of Birth
Tom,Cruise "July 3, 1962"
Bruce,Willis "March 19, 1955"
Morgan,Freeman "June 1, 1937"
John,Wayne "May 26, 1907"
编辑:我合并了你给出的代码,现在出现以下错误。我正在使用 Python 2.7 并导入了相应的 libraries.Sorry 我是 Python.
的新手Error:
File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
"__main__", fname, loader, pkg_name)
File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/home/gurusankar_p/upload-data-datastore-dataflow/upload2.py", line 70, in <module>
| beam.Map(read_header_from_filename))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/core.py", line 2423, in __init__
self.values = tuple(values)
TypeError: 'RuntimeValueProvider' object is not iterable
谢谢,GS
Apache Beam 通过将文件的读取拆分到多个工作线程来并行处理您的数据,这意味着大多数工作线程根本不会读取 header 行。
您要做的是加入 用header 行读取的行。由于 header 行是少量数据,您可以将其作为单独的 PCollection 读入,并将其作为侧输入传递给 CSVtoDict.
阅读您的 header 行的一些示例代码:
def read_header_from_filename(filename):
# note that depending on your newline character/file encoding, this may need to be modified
file_handle = FileSystems.open(filename)
header = file_handle.readline()
return header.split(',')
# Create PCollection containing header line
header = (p
| beam.Create(process_options.input)
| beam.Map(read_header_from_filename))
您的管道构造代码变为:
(p
| 'Reading input file' >> beam.io.ReadFromText(process_options.input)
| 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
)
p.run().wait_until_finish()