ImportError: No module named language_v1.gapic when running Dataflow job

ImportError: No module named language_v1.gapic when running Dataflow job

我正在构建 Dataflow 作业以从云存储中获取数据并将其传递给 NLP API 以执行情绪分析并将结果导入 BigQuery

本地作业运行成功(我没有使用数据流运行器)

import apache_beam as beam
import logging
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types


PROJECT = 'ProjectName'
schema = 'name : STRING, id : STRING, date : STRING,title : STRING, text: STRING,magnitude : STRING, score : STRING'
src_path = "gs://amazoncustreview/sentimentSource.csv"


class Sentiment(beam.DoFn):
    def process(self, element):
        element = element.split(",")
        client = language.LanguageServiceClient()
        document = types.Document(content=element[2],
                                  type=enums.Document.Type.PLAIN_TEXT)
        sentiment = client.analyze_sentiment(document).document_sentiment
        return [{
            'name': element[0],
            'title': element[1],
            'magnitude': sentiment.magnitude,
            'score': sentiment.score
        }]


def main():
    BUCKET = 'BucKet name'
    argv = [
      '--project={0}'.format(PROJECT),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner',
      '--job_name=examplejob2',
      '--save_main_session'
    ]
    p = beam.Pipeline(argv=argv)

    (p
       | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
       | 'ParseCSV' >> beam.ParDo(Sentiment())
       | 'WriteToBigQuery' >> 
    beam.io.WriteToBigQuery('{0}:Dataset.table'.format(PROJECT),
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )
    p.run()


if __name__ == '__main__':
    main()

这是我得到的错误 我尝试导入不同版本的 Google Cloud Language 但我的所有尝试都失败了。

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 773, in run
    self._load_main_session(self.local_staging_directory)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session
    pickler.load_session(session_file)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 280, in load_session
    return dill.load_session(file_path)
  File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 410, in load_session
    module = unpickler.load()
  File "/usr/lib/python2.7/pickle.py", line 864, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1139, in load_reduce
    value = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 828, in _import_module
    return getattr(__import__(module, None, None, [obj]), obj)
ImportError: No module named language_v1.gapic

这似乎与 Dataflow worker 中安装的 google-cloud-language 版本不匹配。要解决它,请创建一个 requirements.txt 文件并添加 google-cloud-language==1.3.0 例如。

然后,将 '--requirements_file=requirements.txt' 添加到管道的选项参数中。

我用 this code 测试了它,它对我有用: