来自 AINotebook/Jupyter 的 Apache Beam/GCP 数据流 运行

Apache Beam/GCP Dataflow running from AINotebook/Jupyter

我们最近将我们的基础设施迁移到了 GCP,我们热衷于使用 DataProc(Spark) 和 DataFlow(Apache Beam)数据管道。 Dataproc 可以很直接地使其工作,但是 运行ning Dataflow 让我们头疼:

我们如何运行来自 JupyterNotebook 的数据流作业(如 AI 笔记本)

例子如下,我有一个巨大的数据集,我想grou_by,然后做一个过滤器和一些计算,然后它应该在特定的桶中写入一个对象(现在这段代码,我不知道如何删除存储桶,而不是做一些有用的事情)

import datetime, os

def preprocess(in_test_mode):
    import shutil, os, subprocess
    job_name = 'hola'

    if in_test_mode:
        print('Launching local job ... hang on')
        OUTPUT_DIR = './preproc'
        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
        os.makedirs(OUTPUT_DIR)
    else:
        print('Launching Dataflow job {} ... hang on'.format(job_name))
        OUTPUT_DIR = 'gs://experimentos-con-humanos/'.format(BUCKET)
        try:
            subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
        except:
            pass

    options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'temp'),
      'temp_location': os.path.join(OUTPUT_DIR, 'temp'),
      'job_name': job_name,
      'project': PROJECT,
      'region': REGION,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
      'max_num_workers': 6
    }
    opts = beam.pipeline.PipelineOptions(flags = [], **options)

    if in_test_mode:
        RUNNER = 'DataflowRunner'
    else:
        RUNNER = 'DataflowRunner'

    p = beam.Pipeline(RUNNER, options = opts)
    (p 
         | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(table_spec))    
         | 'hashAsKey' >> beam.Map(lambda r: (r['afi_hash'], r))
         | 'Transpose' >> beam.GroupByKey()
         | 'Filtro menos de 12' >> beam.Filter(lambda r: len(r[1]) >= 12 )    
         | 'calculos' >> beam.Map(calculos)
            #| 'Group and sum' >> beam.
            #| 'Format results' >> beam.
         | 'Write results' >> beam.Map(lambda r: print(r))
         | '{}_out'.format(1) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(1))))
        )

    job = p.run()
    if in_test_mode:
        job.wait_until_finish()
        print("Done!")

preprocess(in_test_mode = False)

1) 它不起作用,但是 运行 它起作用了! 2) 如果我将 'DataflowRunner' 更改为 'DirectRunner',则该代码有效,这意味着它在本地有效 3) 如果我不更改它,作业将不会出现在数据流中,相反,它会删除它工作的 GCP 存储桶

PD:我确实拥有存储、数据流和 BigQuery 的管理员权限 PD2:table 确实存在,并且我有 cuadruple 检查它是否具有确切名称的 Bucket PD3:我想让它在 Jupyter Notebook 上运行,但如果有人想知道就没有必要了

正如评论中所说,问题似乎在预处理部分。特别是,这部分在本地工作或使用 DataflowRunner:

时执行不同
if in_test_mode:
    print('Launching local job ... hang on')
    OUTPUT_DIR = './preproc'
    shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
    os.makedirs(OUTPUT_DIR)
else:
    print('Launching Dataflow job {} ... hang on'.format(job_name))
    OUTPUT_DIR = 'gs://experimentos-con-humanos/'.format(BUCKET)
    try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
    except:
        pass

似乎负责删除存储桶内容(用于输出、临时文件等)。另请注意,在该示例中,您实际上并未将 BUCKET 添加到 OUTPUT_DIR.