数据流 Python SDK Avro Source/Sync

Dataflow Python SDK Avro Source/Sync

我希望使用 Python SDK 在 GCS 中摄取和写入 Avro 文件。 Avro 目前是否可以利用 Python SDK?如果是这样,我该怎么做?我在源代码中看到了关于此的 TODO 评论,所以我不太乐观。

您是对的:Python SDK 尚不支持此功能,但很快就会支持。

从 Apache Beam/Dataflow Python SDK 2.6.0 版开始,确实可以在 GCS 中读取(和写入)avro 文件。

更好的是,Beam 的 Python SDK 现在支持 fastavro 读取和写入,这比常规 avro IO 快 10 倍。

示例代码:

import apache_beam as beam
from apache_beam.io import ReadFromAvro
from apache_beam.io import WriteToAvro
import avro.schema


RUNNER = 'DataflowRunner'
GCP_PROJECT_ID = 'YOUR_PROJECT_ID'
BUCKET_NAME = 'YOUR_BUCKET_HERE'
STAGING_LOCATION = 'gs://{}/staging'.format(BUCKET_NAME)
TEMP_LOCATION = 'gs://{}/temp'.format(BUCKET_NAME)
GCS_INPUT = "gs://{}/input-*.avro".format(BUCKET_NAME)
GCS_OUTPUT = "gs://{}/".format(BUCKET_NAME)
JOB_NAME = 'conversion-test'

SCHEMA_PATH="YOUR_AVRO_SCHEMA.avsc"
AVRO_SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

OPTIONS = {
    'runner': RUNNER,
    'job_name': JOB_NAME,
    'staging_location': STAGING_LOCATION,
    'temp_location': TEMP_LOCATION,
    'project': GCP_PROJECT_ID,
    'max_num_workers': 2,
    'save_main_session': True,
}

PIPELINE = beam.Pipeline(options=beam.pipeline.PipelineOptions(flags=[], **OPTIONS))


def main():
    # note: have to force `use_fastavro` to enable `fastavro`:
    results = PIPELINE | ReadFromAvro(file_pattern=GCS_INPUT, use_fastavro=True)
    results | WriteToAvro(file_path_prefix=GCS_OUTPUT, schema=AVRO_SCHEMA, use_fastavro=True)


if __name__ == '__main__':
    import os
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'PATH_TO_YOUR_SERVICE_ACCOUNT_KEY'
    main()