使用 dataproc hadoop 集群和气流将数据从 google 云存储移动到 S3

Move data from google cloud storage to S3 using dataproc hadoop cluster and airflow

我正在尝试将大量数据从 GCS 传输到 S3 存储桶。我已经使用 Google DataProc 启动了一个 hadoop 集群。

我可以使用以下命令通过 Hadoop CLI 运行 作业:

hadoop distcp -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder 

我是 mapreduce 和 hadoop 的新手。我正在尝试使用 DataProcHadoopOperator:

将其添加到我的气流工作流程中
export_to_s3 = DataProcHadoopOperator(
            task_id='export_to_s3',
            main_jar=None,
            main_class=None,
            arguments=None,
            archives=None,
            files=None,
            job_name='{{task.task_id}}_{{ds_nodash}}',
            cluster_name='optimize-m',
            dataproc_hadoop_properties=None,
            dataproc_hadoop_jars=None,
            gcp_conn_id='google_cloud_default',
            delegate_to=None,
            region='global',
            dag=dag

  )

我的气流在计算引擎实例上被 dockerized 运行ning。

我不确定如何制作它以创建以下作业

hadoop distcp -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder 

我遵循了建议并构建了以下气流任务:

export_to_s3 = DataProcHadoopOperator(
            task_id='export_to_s3',
            main_jar='file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar',
            main_class=None,
            arguments='-update gs://umg-comm-tech-dev/data/apollo/QA/ s3a://[mys3accessid]:[mys3secret]@s3://umg-ers-analytics/qubole/user-data/pitched/optimize/QA/'.split(' '),
            archives=None,
            files=None,
            job_name='{{task.task_id}}_{{ds_nodash}}',
            cluster_name='optimize',
            dataproc_hadoop_properties=None,
            dataproc_hadoop_jars=None,
            gcp_conn_id='google_cloud_default',
            delegate_to=None,
            region='global',
            dag=dag

  )

但是我现在收到以下错误:

18/01/18 10:13:42 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.2-hadoop2
18/01/18 10:13:42 WARN s3native.S3xLoginHelper: The Filesystem URI contains login details. This is insecure and may be unsupported in future.
18/01/18 10:13:43 WARN s3a.S3AFileSystem: Client: Amazon S3 error 400: 400 Bad Request; Bad Request (retryable)

com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
    at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:276)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:236)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
    at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:100)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
    at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:226)
    at org.apache.hadoop.tools.DistCp.run(DistCp.java:118)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at org.apache.hadoop.tools.DistCp.main(DistCp.java:462)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
    at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
18/01/18 10:13:43 ERROR tools.DistCp: Invalid arguments: 
org.apache.hadoop.fs.s3a.AWSS3IOException: doesBucketExist on s3: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:178)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:282)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:236)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
    at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:100)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
    at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:226)
    at org.apache.hadoop.tools.DistCp.run(DistCp.java:118)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at org.apache.hadoop.tools.DistCp.main(DistCp.java:462)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
    at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
    at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:276)
    ... 18 more
Invalid arguments: doesBucketExist on s3: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696)
usage: distcp OPTIONS [source_path...] <target_path>
              OPTIONS
 -append                       Reuse existing data in target files and
                               append new data to them if possible
 -async                        Should distcp execution be blocking
 -atomic                       Commit all changes or none
 -bandwidth <arg>              Specify bandwidth per map in MB
 -delete                       Delete from target, files missing in source
 -diff <arg>                   Use snapshot diff report to identify the
                               difference between source and target
 -f <arg>                      List of files that need to be copied
 -filelimit <arg>              (Deprecated!) Limit number of files copied
                               to <= n
 -filters <arg>                The path to a file containing a list of
                               strings for paths to be excluded from the
                               copy.
 -i                            Ignore failures during copy
 -log <arg>                    Folder on DFS where distcp execution logs
                               are saved
 -m <arg>                      Max number of concurrent maps to use for
                               copy
 -mapredSslConf <arg>          Configuration for ssl config file, to use
                               with hftps://. Must be in the classpath.
 -numListstatusThreads <arg>   Number of threads to use for building file
                               listing (max 40).
 -overwrite                    Choose to overwrite target files
                               unconditionally, even if they exist.
 -p <arg>                      preserve status (rbugpcaxt)(replication,
                               block-size, user, group, permission,
                               checksum-type, ACL, XATTR, timestamps). If
                               -p is specified with no <arg>, then
                               preserves replication, block size, user,
                               group, permission, checksum type and
                               timestamps. raw.* xattrs are preserved when
                               both the source and destination paths are
                               in the /.reserved/raw hierarchy (HDFS
                               only). raw.* xattrpreservation is
                               independent of the -p flag. Refer to the
                               DistCp documentation for more details.
 -sizelimit <arg>              (Deprecated!) Limit number of files copied
                               to <= n bytes
 -skipcrccheck                 Whether to skip CRC checks between source
                               and target paths.
 -strategy <arg>               Copy strategy to use. Default is dividing
                               work based on file sizes
 -tmp <arg>                    Intermediate work path to be used for
                               atomic commit
 -update                       Update target, copying only missingfiles or
                               directories

非常感谢您的帮助。

谢谢!

为什么要使用数据处理? gsutil 命令不是更简单吗?

例如:

gsutil -m rsync -r gs://GCS s3://S3

这样的操作会将您的数据从 GCS 移动到 S3,请注意那里的 -m 标志,这意味着它将使用并行同步上传,您也可以使用其他标志在原点删除 -d。

我不熟悉 Airflow,但看起来 DataProcHadoopOperator 只是 gcloud dataproc jobs submit hadoop.

的包装器

我相信 hadoop distcp 只是 hadoop jar /usr/lib/hadoop-mapreduce/hadoop-distcp.jar 的包装器,因此通过 Dataproc API 提交 distcp 将如下所示:

gcloud dataproc jobs submit hadoop --cluster=optimize --jar file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar -- -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder

所以对于气流你可能想要这样的东西:

main_jar='file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar', arguments='-update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder'.split(' '),

仅供参考,如果 optimize-m 是主 VM 名称,则 airflow 的集群参数可能只是 optimize