Apache Spark 在 Scala 中嵌套迭代以生成统计数据 RDD
Apache Spark nested iterations in Scala to generate stats RDD
我有一个由 rowkey = client_id、campaigns = {campaign_id:campaign_name}
组成的 Json 数组
val clientsRDD = resultRDD.map(ClientRow.parseClientRow)
// change RDD of ClientRow objects to a DataFrame
val clientsDF = clientsRDD.toDF()
// Return the schema of this DataFrame
clientsDF.printSchema()
// print each line DataFrame
clientsDF.collect().foreach(println)
输出:
root
|-- rowkey: string (nullable = true)
|-- campaigns: string (nullable = true)
[1,[{"1000":"campaign1"},{"1001":"campaign2"}]]
[2,[{"1002":"campaign3"}]]
我还有一个 RDD,其中包含来自 HBase 的所有客户和活动数据的记录。
记录RDD
rowkey type body
client_id-campaign_id, record_type, record_text
我的目标是为每个客户(考虑其所有活动)和每个活动生成统计信息,例如计算所有 client_id 记录,按类型分组并计算每个活动记录,按类型。
client1
records:100, login:20, actions:80
client1 campaign1
records:70, login:16, actions:50
client1 campaign2
records:30, login:4, actions:30
最后我要写统计
在 Spark 中使用 Scala 执行此操作的最佳方法是什么?
我是否必须迭代 clientsRDD(映射?),并为每一行生成不同的 RDD 映射 recordsRDD?
首先您需要为活动字段定义架构:
它的意思是
您使用
定义模式
val schema = StructType(Seq(StructField("rowkey", StringType, true),
StructField("campaigns", StructType(
StructField("id", StringType, true) ::
StructField("name", StringType, true) :: Nil
))
))
然后您可以在活动字段中使用 explode
方法使行变平。
val df = sqlContext.createDataFrame(clientsRDD, schema)
df.select(col("rowkey"), explode(col("campaigns")).as("campaign")).filter(col("campaign.id") === 1)
我有一个由 rowkey = client_id、campaigns = {campaign_id:campaign_name}
组成的 Json 数组val clientsRDD = resultRDD.map(ClientRow.parseClientRow)
// change RDD of ClientRow objects to a DataFrame
val clientsDF = clientsRDD.toDF()
// Return the schema of this DataFrame
clientsDF.printSchema()
// print each line DataFrame
clientsDF.collect().foreach(println)
输出:
root
|-- rowkey: string (nullable = true)
|-- campaigns: string (nullable = true)
[1,[{"1000":"campaign1"},{"1001":"campaign2"}]]
[2,[{"1002":"campaign3"}]]
我还有一个 RDD,其中包含来自 HBase 的所有客户和活动数据的记录。
记录RDD
rowkey type body
client_id-campaign_id, record_type, record_text
我的目标是为每个客户(考虑其所有活动)和每个活动生成统计信息,例如计算所有 client_id 记录,按类型分组并计算每个活动记录,按类型。
client1
records:100, login:20, actions:80
client1 campaign1
records:70, login:16, actions:50
client1 campaign2
records:30, login:4, actions:30
最后我要写统计
在 Spark 中使用 Scala 执行此操作的最佳方法是什么? 我是否必须迭代 clientsRDD(映射?),并为每一行生成不同的 RDD 映射 recordsRDD?
首先您需要为活动字段定义架构: 它的意思是 您使用
定义模式val schema = StructType(Seq(StructField("rowkey", StringType, true),
StructField("campaigns", StructType(
StructField("id", StringType, true) ::
StructField("name", StringType, true) :: Nil
))
))
然后您可以在活动字段中使用 explode
方法使行变平。
val df = sqlContext.createDataFrame(clientsRDD, schema)
df.select(col("rowkey"), explode(col("campaigns")).as("campaign")).filter(col("campaign.id") === 1)