Pyspark 文本文件 json 带缩进
Pyspark textFile json with indentation
Wanne 读取了一个 json 文件并在 RDD 中进行了缩进,但是 spark 然后抛出了一个异常。
# txts = sc.textFile('data/jsons_without_indentation') # works
txts = sc.textFile('data/jsons_with_indentation') # fails
txts_dicts = txts.map(lambda data: json.loads(data))
txts_dicts.collect()
sc.wholeTextFiles 也不行。是否可以加载带有缩进的 json 而无需先将其转换为文件?
示例 json 文件如下所示:
{
"data": {
"text": {
"de": "Ein Text.",
"en": "A text."
}
}
}
如果这只是每个文件一个 JSON 文档,则您只需要 SparkContext.wholeTextFiles
。首先让我们创建一些虚拟数据:
import tempfile
import json
input_dir = tempfile.mkdtemp()
docs = [
{'data': {'text': {'de': 'Ein Text.', 'en': 'A text.'}}},
{'data': {'text': {'de': 'Ein Bahnhof.', 'en': 'A railway station.'}}},
{'data': {'text': {'de': 'Ein Hund.', 'en': 'A dog.'}}}]
for doc in docs:
with open(tempfile.mktemp(suffix="json", dir=input_dir), "w") as fw:
json.dump(doc, fw, indent=4)
现在让我们读取数据:
rdd = sc.wholeTextFiles(input_dir).values()
并确保文件确实缩进:
print rdd.top(1)[0]
## {
## "data": {
## "text": {
## "de": "Ein Text.",
## "en": "A text."
## }
## }
## }
终于可以解析了:
parsed = rdd.map(json.loads)
并检查是否一切正常:
parsed.takeOrdered(3)
## [{u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}},
## {u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
## {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}}]
如果您仍然遇到一些问题,很可能是由于某些格式不正确的条目。您可以做的最简单的事情是使用 flatMap
和自定义包装器丢弃格式错误的条目:
rdd_malformed = sc.parallelize(["{u'data': {u'text': {u'de':"]).union(rdd)
## org.apache.spark.api.python.PythonException: Traceback (most recent call ...
## ...
## ValueError: Expecting property name: line 1 column 2 (char 1)
并使用 try_seq
包装(此处定义:What is the equivalent to scala.util.Try in pyspark?)
rdd_malformed.flatMap(lambda x: seq_try(json.loads, x)).collect()
## [{u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
## {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}},
## {u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}}]
Wanne 读取了一个 json 文件并在 RDD 中进行了缩进,但是 spark 然后抛出了一个异常。
# txts = sc.textFile('data/jsons_without_indentation') # works
txts = sc.textFile('data/jsons_with_indentation') # fails
txts_dicts = txts.map(lambda data: json.loads(data))
txts_dicts.collect()
sc.wholeTextFiles 也不行。是否可以加载带有缩进的 json 而无需先将其转换为文件?
示例 json 文件如下所示:
{
"data": {
"text": {
"de": "Ein Text.",
"en": "A text."
}
}
}
如果这只是每个文件一个 JSON 文档,则您只需要 SparkContext.wholeTextFiles
。首先让我们创建一些虚拟数据:
import tempfile
import json
input_dir = tempfile.mkdtemp()
docs = [
{'data': {'text': {'de': 'Ein Text.', 'en': 'A text.'}}},
{'data': {'text': {'de': 'Ein Bahnhof.', 'en': 'A railway station.'}}},
{'data': {'text': {'de': 'Ein Hund.', 'en': 'A dog.'}}}]
for doc in docs:
with open(tempfile.mktemp(suffix="json", dir=input_dir), "w") as fw:
json.dump(doc, fw, indent=4)
现在让我们读取数据:
rdd = sc.wholeTextFiles(input_dir).values()
并确保文件确实缩进:
print rdd.top(1)[0]
## {
## "data": {
## "text": {
## "de": "Ein Text.",
## "en": "A text."
## }
## }
## }
终于可以解析了:
parsed = rdd.map(json.loads)
并检查是否一切正常:
parsed.takeOrdered(3)
## [{u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}},
## {u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
## {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}}]
如果您仍然遇到一些问题,很可能是由于某些格式不正确的条目。您可以做的最简单的事情是使用 flatMap
和自定义包装器丢弃格式错误的条目:
rdd_malformed = sc.parallelize(["{u'data': {u'text': {u'de':"]).union(rdd)
## org.apache.spark.api.python.PythonException: Traceback (most recent call ...
## ...
## ValueError: Expecting property name: line 1 column 2 (char 1)
并使用 try_seq
包装(此处定义:What is the equivalent to scala.util.Try in pyspark?)
rdd_malformed.flatMap(lambda x: seq_try(json.loads, x)).collect()
## [{u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
## {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}},
## {u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}}]