Spark Error:- "value foreach is not a member of Object"

Spark Error:- "value foreach is not a member of Object"

数据框由两列(s3ObjectName,batchName)组成,有数万行,如:-

s3ObjectName batchName
a1.json 45
b2.json 45
c3.json 45
d4.json 46
e5.json 46

objective 是从 S3 存储桶中检索对象并使用 foreachPartition() 和 foreach() 函数使用数据框中每一行的详细信息并行写入数据湖

  // s3 connector details defined as an object so it can be serialized and available on all executors in the cluster

object container {
  
  def getDataSource() = {
    val AccessKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-ID")
    val SecretKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-Secret")
    val creds = new BasicAWSCredentials(AccessKey, SecretKey)
    val clientRegion: Regions = Regions.US_EAST_1
    AmazonS3ClientBuilder.standard()
    .withRegion(clientRegion)
    .withCredentials(new AWSStaticCredentialsProvider(creds))
    .build()
    
  }
}

dataframe.foreachPartition(partition => {
      //Initialize s3 connection for each partition
      val client: AmazonS3 = container.getDataSource()
      partition.foreach(row => {
        val s3ObjectName = row.getString(0)
        val batchname = row.getString(1)
        val inputS3Stream = client.getObject("s3bucketname", s3ObjectName).getObjectContent
        val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8")
        val filePath = s"/dbfs/mnt/test/${batchname}/${s3ObjectName}"
        val file = new File(filePath)
        val fileWriter = new FileWriter(file)
        val bw = new BufferedWriter(fileWriter)
        bw.write(inputS3String)
        bw.close()
        fileWriter.close()
        })
      })  

以上过程给了我

Error: value foreach is not a member of Object

在调用 foreachPartition 之前将 Dataframe 转换为 RDD。

dataframe.rdd.foreachPartition(partition => {

})