如何将 json 个对象的 spark RDD 映射到另一个包含仅具有选定属性集的对象的 RDD

How can I map a spark RDD of json objects to another RDD which contains objects with only selected set of attributes

我有一个包含 json 个对象的 spark RDD (productList),格式如下。

{u'name': u'product_id', u'price': 12, u'quantity': 1}'

现在,我想将其映射到另一个只包含 'product_id' 和 total_amount 的 RDD,即价格*数量。以下将生成 totalAmounts 列表。但是我怎样才能同时映射 product_id 和总量。

total_amount_list = productList.map(lambda x: x['price']*x['quantity'])

是这样的吗?

productList = sc.parallelize([
    {u'name': u'product_id', u'price': 12, u'quantity': 1}])

productList.map(
    lambda x: {'name': x['name'],  'total': x['price'] * x['quantity']}
).first()

## {'name': 'product_id', 'total': 12}

如果您的输入数据是 JSONL 文件,那么您应该考虑使用 DataFrames:

from pyspark.sql.functions import col

s = (
    '{"quantity": 1, "name": "product_id", "price": 12}\n'
    '{"quantity": 3, "name": "product_id2", "price": 5}'
)

with open('/tmp/test.jsonl', 'w') as fw:
  fw.write(s)

df = sqlContext.read.json('/tmp/test.jsonl')
df.withColumn('total', col('price') * col('quantity'))