Spark Error:- "value foreach is not a member of Object"
Spark Error:- "value foreach is not a member of Object"
数据框由两列(s3ObjectName,batchName)组成,有数万行,如:-
s3ObjectName
batchName
a1.json
45
b2.json
45
c3.json
45
d4.json
46
e5.json
46
objective 是从 S3 存储桶中检索对象并使用 foreachPartition() 和 foreach() 函数使用数据框中每一行的详细信息并行写入数据湖
// s3 connector details defined as an object so it can be serialized and available on all executors in the cluster
object container {
def getDataSource() = {
val AccessKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-ID")
val SecretKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-Secret")
val creds = new BasicAWSCredentials(AccessKey, SecretKey)
val clientRegion: Regions = Regions.US_EAST_1
AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new AWSStaticCredentialsProvider(creds))
.build()
}
}
dataframe.foreachPartition(partition => {
//Initialize s3 connection for each partition
val client: AmazonS3 = container.getDataSource()
partition.foreach(row => {
val s3ObjectName = row.getString(0)
val batchname = row.getString(1)
val inputS3Stream = client.getObject("s3bucketname", s3ObjectName).getObjectContent
val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8")
val filePath = s"/dbfs/mnt/test/${batchname}/${s3ObjectName}"
val file = new File(filePath)
val fileWriter = new FileWriter(file)
val bw = new BufferedWriter(fileWriter)
bw.write(inputS3String)
bw.close()
fileWriter.close()
})
})
以上过程给了我
Error: value foreach is not a member of Object
在调用 foreachPartition
之前将 Dataframe 转换为 RDD。
dataframe.rdd.foreachPartition(partition => {
})
数据框由两列(s3ObjectName,batchName)组成,有数万行,如:-
s3ObjectName | batchName |
---|---|
a1.json | 45 |
b2.json | 45 |
c3.json | 45 |
d4.json | 46 |
e5.json | 46 |
objective 是从 S3 存储桶中检索对象并使用 foreachPartition() 和 foreach() 函数使用数据框中每一行的详细信息并行写入数据湖
// s3 connector details defined as an object so it can be serialized and available on all executors in the cluster
object container {
def getDataSource() = {
val AccessKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-ID")
val SecretKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-Secret")
val creds = new BasicAWSCredentials(AccessKey, SecretKey)
val clientRegion: Regions = Regions.US_EAST_1
AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new AWSStaticCredentialsProvider(creds))
.build()
}
}
dataframe.foreachPartition(partition => {
//Initialize s3 connection for each partition
val client: AmazonS3 = container.getDataSource()
partition.foreach(row => {
val s3ObjectName = row.getString(0)
val batchname = row.getString(1)
val inputS3Stream = client.getObject("s3bucketname", s3ObjectName).getObjectContent
val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8")
val filePath = s"/dbfs/mnt/test/${batchname}/${s3ObjectName}"
val file = new File(filePath)
val fileWriter = new FileWriter(file)
val bw = new BufferedWriter(fileWriter)
bw.write(inputS3String)
bw.close()
fileWriter.close()
})
})
以上过程给了我
Error: value foreach is not a member of Object
在调用 foreachPartition
之前将 Dataframe 转换为 RDD。
dataframe.rdd.foreachPartition(partition => {
})