Writing spark dataframe from azure databricks to S3 causes java.lang.VerifyError: Bad type on operand stack error

Writing spark dataframe from azure databricks to S3 causes java.lang.VerifyError: Bad type on operand stack error

我正在使用以下代码将 spark 数据帧保存到 S3(csv 文件)

import traceback

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Attached the spark submit command used
# spark-submit --master local[1] --packages org.apache.hadoop:hadoop-aws:2.7.3,
# com.amazonaws:aws-java-sdk-s3:1.11.98 my_file.py

ACCESS_KEY_ID = "xxxxxxxxxx"
SECRET_ACCESS_KEY = "yyyyyyyyyyyyy"
BUCKET_NAME = "zzzz"

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL data source example") \
    .getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()

try:
    spark.conf.set("fs.s3n.awsAccessKeyId", ACCESS_KEY_ID)
    spark.conf.set("fs.s3n.awsSecretAccessKey", SECRET_ACCESS_KEY)
    spark.conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

    output_directory = 's3n://' + BUCKET_NAME + '/' + str("azure_dbs")
    df.write.save(output_directory + '_csv', format='csv', header=True, mode="overwrite")
    print("Written successful")
except Exception as exp:
    print("Exception occurred")
    print(exp)
    print(traceback.format_exc())

当我从我的本地系统 运行 它时,它成功写入 S3(使用 spark-submit)。 使用的spark-submit命令是

spark-submit --master local[1] --packages org.apache.hadoop:hadoop-aws:2.7.3, com.amazonaws:aws-java-sdk-s3:1.11.98 my_file.py

但是当我 运行 这是来自 azure databricks notebook 的作业,并将这些包作为作业的附加依赖项时,我收到以下错误。


    py4j.protocol.Py4JJavaError: An error occurred while calling o252.save.
    : java.lang.VerifyError: Bad type on operand stack
    Exception Details:
      Location:
        org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V @152: invokevirtual
      Reason:
        Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not assignable to 'org/jets3t/service/model/StorageObject'
      Current Frame:
        bci: @152
        flags: { }
        locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
        stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', integer }
      Bytecode:
        0x0000000: b200 41b9 0067 0100 9900 36b2 0041 bb00
        0x0000010: 5959 b700 5a12 68b6 005b 2bb6 005b 1269
        0x0000020: b600 5b2c b600 5b12 6ab6 005b 2ab4 0023
        0x0000030: b600 3cb6 005b b600 5cb9 006b 0200 2ab4
        0x0000040: 0011 9900 302a b400 0c2a b400 232b 0101
        0x0000050: 0101 b600 6c4e 2ab4 001c 0994 9e00 162d
        0x0000060: b600 6d2a b400 1c94 9e00 0a2a 2d2c b600
        0x0000070: 6eb1 bb00 2a59 2cb7 002b 4e2d 2ab4 001f
        0x0000080: b600 302a b400 0c2a b400 23b6 003c 2b2a
        0x0000090: b400 23b6 003c 2d03 b600 6f57 a700 0a4e
        0x00000a0: 2a2d 2bb7 0035 b1                      
      Exception Handler Table:
        bci [0, 113] => handler: 159
        bci [114, 156] => handler: 159
      Stackmap Table:
        same_frame(@62)
        same_frame(@114)
        same_locals_1_stack_item_frame(@159,Object[#216])
        same_frame(@166)

        at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:342)
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:332)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
        at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:94)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
        at com.databricks.sql.transaction.tahoe.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:103)
        at com.databricks.sql.transaction.tahoe.DeltaValidation$.validateNonDeltaWrite(DeltaValidation.scala:94)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:261)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
        at py4j.Gateway.invoke(Gateway.java:295)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:251)
        at java.lang.Thread.run(Thread.java:748)

(虽然 运行将此作为来自 azure databricks 的笔记本作业,但我没有像在本地机器场景中那样创建新的 spark 对象,而是使用 databricks 提供的现有 spark。)

错误的原因是什么。当我们 运行 来自 azure databricks 时,我们是否需要任何额外的包?

包含的 Spark 提交包:

本地机器:
Python3.6
Spark 版本 2.4.4 使用 Scala 版本 2.11.12

Databricks 详细信息:
集群信息:
5.5 LTS(包括 Apache Spark 2.4.3、Scala 2.11)
Python 3 (3.5)

在 Azure databricks 中,我们似乎需要更新用于设置配置的密钥。参考 given by Carlos David Peña.


我们需要使用密钥 "spark.hadoop.fs.s3n.impl" 而不是 "fs.s3n.impl"

注意:无需显式添加任何 依赖库 到作业中。(azure databricks)