ImportError: No module named language_v1.gapic when running Dataflow job
ImportError: No module named language_v1.gapic when running Dataflow job
我正在构建 Dataflow 作业以从云存储中获取数据并将其传递给 NLP API 以执行情绪分析并将结果导入 BigQuery
本地作业运行成功(我没有使用数据流运行器)
import apache_beam as beam
import logging
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
PROJECT = 'ProjectName'
schema = 'name : STRING, id : STRING, date : STRING,title : STRING, text: STRING,magnitude : STRING, score : STRING'
src_path = "gs://amazoncustreview/sentimentSource.csv"
class Sentiment(beam.DoFn):
def process(self, element):
element = element.split(",")
client = language.LanguageServiceClient()
document = types.Document(content=element[2],
type=enums.Document.Type.PLAIN_TEXT)
sentiment = client.analyze_sentiment(document).document_sentiment
return [{
'name': element[0],
'title': element[1],
'magnitude': sentiment.magnitude,
'score': sentiment.score
}]
def main():
BUCKET = 'BucKet name'
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner',
'--job_name=examplejob2',
'--save_main_session'
]
p = beam.Pipeline(argv=argv)
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| 'ParseCSV' >> beam.ParDo(Sentiment())
| 'WriteToBigQuery' >>
beam.io.WriteToBigQuery('{0}:Dataset.table'.format(PROJECT),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
main()
这是我得到的错误 我尝试导入不同版本的 Google Cloud Language 但我的所有尝试都失败了。
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 773, in run
self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session
pickler.load_session(session_file)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 280, in load_session
return dill.load_session(file_path)
File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 410, in load_session
module = unpickler.load()
File "/usr/lib/python2.7/pickle.py", line 864, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1139, in load_reduce
value = func(*args)
File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 828, in _import_module
return getattr(__import__(module, None, None, [obj]), obj)
ImportError: No module named language_v1.gapic
这似乎与 Dataflow worker 中安装的 google-cloud-language
版本不匹配。要解决它,请创建一个 requirements.txt
文件并添加 google-cloud-language==1.3.0
例如。
然后,将 '--requirements_file=requirements.txt'
添加到管道的选项参数中。
我用 this code 测试了它,它对我有用:
我正在构建 Dataflow 作业以从云存储中获取数据并将其传递给 NLP API 以执行情绪分析并将结果导入 BigQuery
本地作业运行成功(我没有使用数据流运行器)
import apache_beam as beam
import logging
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
PROJECT = 'ProjectName'
schema = 'name : STRING, id : STRING, date : STRING,title : STRING, text: STRING,magnitude : STRING, score : STRING'
src_path = "gs://amazoncustreview/sentimentSource.csv"
class Sentiment(beam.DoFn):
def process(self, element):
element = element.split(",")
client = language.LanguageServiceClient()
document = types.Document(content=element[2],
type=enums.Document.Type.PLAIN_TEXT)
sentiment = client.analyze_sentiment(document).document_sentiment
return [{
'name': element[0],
'title': element[1],
'magnitude': sentiment.magnitude,
'score': sentiment.score
}]
def main():
BUCKET = 'BucKet name'
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner',
'--job_name=examplejob2',
'--save_main_session'
]
p = beam.Pipeline(argv=argv)
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| 'ParseCSV' >> beam.ParDo(Sentiment())
| 'WriteToBigQuery' >>
beam.io.WriteToBigQuery('{0}:Dataset.table'.format(PROJECT),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
main()
这是我得到的错误 我尝试导入不同版本的 Google Cloud Language 但我的所有尝试都失败了。
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 773, in run
self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session
pickler.load_session(session_file)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 280, in load_session
return dill.load_session(file_path)
File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 410, in load_session
module = unpickler.load()
File "/usr/lib/python2.7/pickle.py", line 864, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1139, in load_reduce
value = func(*args)
File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 828, in _import_module
return getattr(__import__(module, None, None, [obj]), obj)
ImportError: No module named language_v1.gapic
这似乎与 Dataflow worker 中安装的 google-cloud-language
版本不匹配。要解决它,请创建一个 requirements.txt
文件并添加 google-cloud-language==1.3.0
例如。
然后,将 '--requirements_file=requirements.txt'
添加到管道的选项参数中。
我用 this code 测试了它,它对我有用: