如何通过 apache airflow 处理 Google 存储中的数据?
How can I process data in Google storage via apache airflow?
我在 google 云存储中有一个 CSV 文件。我正在使用 google cloud composer 来 运行 apache airflow。我想在我的 CSV 文件上 运行 一些 bash 脚本并将其存储回 google 云存储?我尝试搜索各种运算符,但找不到任何处理 google 存储中文件的运算符。有什么办法可以做到这一点?
提前致谢。
这是一个例子:
bash_operator.BashOperator(
task_id="process_csv",
bash_command="gsutil cp gs://your_bucket/your_file.csv your_file.csv && "
"process_file your_file.csv > processed_file.csv && "
"gsutil cp processed_file.csv gs://your_bucket/processed_file.csv",
execution_timeout=timedelta(hours=1),
dag=dag
)
您可以在此存储库中找到更多示例 https://github.com/blockchain-etl/bitcoin-etl-airflow/blob/develop/dags/bitcoinetl/build_export_dag.py。
您也可以使用 PythonOperator 代替 BashOperator。可以在此处找到一些示例 https://github.com/blockchain-etl/ethereum-etl-airflow/blob/master/dags/export_dag.py
我在 google 云存储中有一个 CSV 文件。我正在使用 google cloud composer 来 运行 apache airflow。我想在我的 CSV 文件上 运行 一些 bash 脚本并将其存储回 google 云存储?我尝试搜索各种运算符,但找不到任何处理 google 存储中文件的运算符。有什么办法可以做到这一点?
提前致谢。
这是一个例子:
bash_operator.BashOperator(
task_id="process_csv",
bash_command="gsutil cp gs://your_bucket/your_file.csv your_file.csv && "
"process_file your_file.csv > processed_file.csv && "
"gsutil cp processed_file.csv gs://your_bucket/processed_file.csv",
execution_timeout=timedelta(hours=1),
dag=dag
)
您可以在此存储库中找到更多示例 https://github.com/blockchain-etl/bitcoin-etl-airflow/blob/develop/dags/bitcoinetl/build_export_dag.py。
您也可以使用 PythonOperator 代替 BashOperator。可以在此处找到一些示例 https://github.com/blockchain-etl/ethereum-etl-airflow/blob/master/dags/export_dag.py