如何为 apache beam 数据流的输出 csv 添加 headers?
How do I add headers for the output csv for apache beam dataflow?
我注意到在java sdk中,有一个函数可以让你写一个csv文件的headers。
https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/TextIO.Write.html#withHeader-java.lang.String-
此功能是否反映在 python skd 上?
PythonSDK
中尚不存在此功能
目前还没有实现。但是,您可以自己 implement/extend(请参阅 attached notebook 示例+使用我的 apache_beam 版本进行测试)。
这是基于超classFileSink
的一个note in the docstring,提到你应该覆盖open
函数:
适用于我的 apache_beam 版本的新 class ('0.3.0-incubating.dev'):
import apache_beam as beam
from apache_beam.io import TextFileSink
from apache_beam.io.fileio import ChannelFactory,CompressionTypes
from apache_beam import coders
class TextFileSinkWithHeader(TextFileSink):
def __init__(self,
file_path_prefix,
file_name_suffix='',
append_trailing_newlines=True,
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=CompressionTypes.NO_COMPRESSION,
header=None):
super(TextFileSinkWithHeader, self).__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
num_shards=num_shards,
shard_name_template=shard_name_template,
coder=coder,
compression_type=compression_type,
append_trailing_newlines=append_trailing_newlines)
self.header = header
def open(self, temp_path):
channel_factory = ChannelFactory.open(
temp_path,
'wb',
mime_type=self.mime_type)
channel_factory.write(self.header+"\n")
return channel_factory
您随后可以按如下方式使用它:
beam.io.Write(TextFileSinkWithHeader('./names_w_headers',header="names"))
有关完整概述,请参阅 the notebook。
您现在可以写入文本并使用文本接收器指定 header。
来自文档:
class apache_beam.io.textio.WriteToText(file_path_prefix, file_name_suffix='', append_trailing_newlines=True, num_shards=0, shard_name_template=None, coder=ToStringCoder, compression_type='auto', header=None)
所以你可以使用下面这段代码:
beam.io.WriteToText(bucket_name, file_name_suffix='.csv', header='colname1, colname2')
如果您需要详细信息或查看其实现方式,可在此处获取完整文档:https://beam.apache.org/documentation/sdks/pydoc/2.0.0/_modules/apache_beam/io/textio.html#WriteToText
对于 Python SDK:
beam.io.Write(beam.io.WriteToText(
file_path_prefix=os.path.join(OUTPUT_DIR),
file_name_suffix='.csv', header='colname1, colname2')
)
我注意到在java sdk中,有一个函数可以让你写一个csv文件的headers。 https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/TextIO.Write.html#withHeader-java.lang.String-
此功能是否反映在 python skd 上?
PythonSDK
中尚不存在此功能目前还没有实现。但是,您可以自己 implement/extend(请参阅 attached notebook 示例+使用我的 apache_beam 版本进行测试)。
这是基于超classFileSink
的一个note in the docstring,提到你应该覆盖open
函数:
适用于我的 apache_beam 版本的新 class ('0.3.0-incubating.dev'):
import apache_beam as beam
from apache_beam.io import TextFileSink
from apache_beam.io.fileio import ChannelFactory,CompressionTypes
from apache_beam import coders
class TextFileSinkWithHeader(TextFileSink):
def __init__(self,
file_path_prefix,
file_name_suffix='',
append_trailing_newlines=True,
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=CompressionTypes.NO_COMPRESSION,
header=None):
super(TextFileSinkWithHeader, self).__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
num_shards=num_shards,
shard_name_template=shard_name_template,
coder=coder,
compression_type=compression_type,
append_trailing_newlines=append_trailing_newlines)
self.header = header
def open(self, temp_path):
channel_factory = ChannelFactory.open(
temp_path,
'wb',
mime_type=self.mime_type)
channel_factory.write(self.header+"\n")
return channel_factory
您随后可以按如下方式使用它:
beam.io.Write(TextFileSinkWithHeader('./names_w_headers',header="names"))
有关完整概述,请参阅 the notebook。
您现在可以写入文本并使用文本接收器指定 header。
来自文档:
class apache_beam.io.textio.WriteToText(file_path_prefix, file_name_suffix='', append_trailing_newlines=True, num_shards=0, shard_name_template=None, coder=ToStringCoder, compression_type='auto', header=None)
所以你可以使用下面这段代码:
beam.io.WriteToText(bucket_name, file_name_suffix='.csv', header='colname1, colname2')
如果您需要详细信息或查看其实现方式,可在此处获取完整文档:https://beam.apache.org/documentation/sdks/pydoc/2.0.0/_modules/apache_beam/io/textio.html#WriteToText
对于 Python SDK:
beam.io.Write(beam.io.WriteToText(
file_path_prefix=os.path.join(OUTPUT_DIR),
file_name_suffix='.csv', header='colname1, colname2')
)