如何使用 Databricks 在 Apache Spark 中使用 SparkSQL 或 PySpark 查询 JSON 中的数组
How to query arrays in JSON with either SparkSQL or PySpark in Apache Spark with Databricks
我在 JSON 文件中有数据(只显示第一行)
{
"cd_created_date": "2021-10-05T21:33:39.480933",
"cd_jurisdiction": "PROBATE",
"cd_last_modified": "2021-10-05T21:35:04.061105",
"cd_last_state_modified_date": "2021-10-05T21:35:04.060968",
"cd_latest_state": "WillWithdrawn",
"cd_reference": 1633469619443286,
"cd_security_classification": "PUBLIC",
"cd_version": 7,
"ce_case_data_id": 3483511,
"ce_case_type_id": "WillLodgement",
"ce_case_type_version": 170,
"ce_created_date": "2021-10-05T21:33:51.189872",
"ce_data": "{\"willDate\": \"1950-01-01\", \"jointWill\": \"Yes\", \"lodgedDate\": \"1970-03-03\", \"codicilDate\": \"1962-02-02\", \"executorTitle\": \"Mr\", \"lodgementType\": \"safeCustody\", \"deceasedGender\": \"male\", \"applicationType\": \"Personal\", \"deceasedAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"deceasedSurname\": \"E2E_deceased_surname_1633469477956\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor1_surname\", \"numberOfCodicils\": \"3\", \"registryLocation\": \"Liverpool\", \"deceasedForenames\": \"E2E_deceased_forenames_1633469477956\", \"documentsUploaded\": [{\"id\": \"b1181bfb-d0a7-49d8-8301-b06e58eb42c1\", \"value\": {\"Comment\": \"test file to upload\", \"DocumentLink\": {\"document_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d\", \"document_filename\": \"test_file_for_document_upload.png\", \"document_binary_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d/binary\"}, \"DocumentType\": \"email\"}}], \"executorForenames\": \"executor1_forenames\", \"deceasedDateOfBirth\": \"1930-01-01\", \"deceasedDateOfDeath\": \"2017-01-01\", \"deceasedTypeOfDeath\": \"diedOnOrAbout\", \"deceasedEmailAddress\": \"primary@probate-test.com\", \"executorEmailAddress\": \"executor1@probate-test.com\", \"deceasedAnyOtherNames\": \"Yes\", \"additionalExecutorList\": [{\"id\": \"bd8d7ca2-ed84-424c-a241-40fd99b15596\", \"value\": {\"executorTitle\": \"Dr\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor2_surname\", \"executorForenames\": \"executor2_forenames\", \"executorEmailAddress\": \"executor2@probate-test.com\"}}], \"deceasedFullAliasNameList\": [{\"id\": \"1970ac9d-532e-48f2-8851-04ae6eec973f\", \"value\": {\"FullAliasName\": \"deceased_alias1_1633469477956\"}}, {\"id\": \"86c842aa-e10f-44d5-8c28-a7ce8a1cb0eb\", \"value\": {\"FullAliasName\": \"deceased_alias2\"}}]}",
"ce_description": "upload_document_event_description_text",
"ce_event_id": "uploadDocument",
"ce_event_name": "Upload document",
"ce_id": 30638630,
"ce_security_classification": "PUBLIC",
"ce_state_id": "WillLodgementCreated",
"ce_state_name": "Will lodgement created",
"ce_summary": "upload_document_event_summary_text",
"ce_user_first_name": "Probate",
"ce_user_id": "349978",
"ce_user_last_name": "Backoffice",
"extraction_date": "2021-10-06"
}
因为你可以看到字段 ce_data 包含一个数组。
当我使用 Databricks 在 Apache Spark 中读取 JSON 时,我得到以下 printSchema()
root
|-- cd_created_date: string (nullable = true)
|-- cd_jurisdiction: string (nullable = true)
|-- cd_last_modified: string (nullable = true)
|-- cd_last_state_modified_date: string (nullable = true)
|-- cd_latest_state: string (nullable = true)
|-- cd_reference: long (nullable = true)
|-- cd_security_classification: string (nullable = true)
|-- cd_version: long (nullable = true)
|-- ce_case_data_id: long (nullable = true)
|-- ce_case_type_id: string (nullable = true)
|-- ce_case_type_version: long (nullable = true)
|-- ce_created_date: string (nullable = true)
**|-- ce_data: string (nullable = true)**
|-- ce_description: string (nullable = true)
|-- ce_event_id: string (nullable = true)
|-- ce_event_name: string (nullable = true)
|-- ce_id: long (nullable = true)
|-- ce_security_classification: string (nullable = true)
|-- ce_state_id: string (nullable = true)
|-- ce_state_name: string (nullable = true)
|-- ce_summary: string (nullable = true)
|-- ce_user_first_name: string (nullable = true)
|-- ce_user_id: string (nullable = true)
|-- ce_user_last_name: string (nullable = true)
|-- extraction_date: string (nullable = true)
从上面的 Databricks 中的 PrintSchema 可以看出,字段 ce_data 没有显示为数组。
但是,我想查询ce_data字段中的数组。例如,我想编写一个查询来找到 lodgeDate = 1970-03-03?
我的尝试是这样的
test = spark.sql("""select ce_data from testtable where ce_data.lodgeDate = '1970-03-03'""")
当我在 Databricks 中输入上述代码时出现的错误是:
Can't extract value from ce_data#12747: need struct type but got string;
所以,我首先需要了解为什么我没有在 printSchema() 中看到数组,但是我的主要问题是如何使用 sparkSQL 在 JSON 中查询数组。
我也想知道是否需要导入一些库?
看起来您的字段是 json 字符串而不是数组。您可以使用“from_json”方法将其转换为结构类型。然后你可以用上面的代码查询它。
请注意,您需要 json 的架构才能使用 from_json 方法。例如
from pyspark.sql import functions as f
# get your json schema
json_string = "{\"willDate\": \"1950-01-01\", \"jointWill\": \"Yes\", \"lodgedDate\": \"1970-03-03\", \"codicilDate\": \"1962-02-02\", \"executorTitle\": \"Mr\", \"lodgementType\": \"safeCustody\", \"deceasedGender\": \"male\", \"applicationType\": \"Personal\", \"deceasedAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"deceasedSurname\": \"E2E_deceased_surname_1633469477956\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor1_surname\", \"numberOfCodicils\": \"3\", \"registryLocation\": \"Liverpool\", \"deceasedForenames\": \"E2E_deceased_forenames_1633469477956\", \"documentsUploaded\": [{\"id\": \"b1181bfb-d0a7-49d8-8301-b06e58eb42c1\", \"value\": {\"Comment\": \"test file to upload\", \"DocumentLink\": {\"document_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d\", \"document_filename\": \"test_file_for_document_upload.png\", \"document_binary_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d/binary\"}, \"DocumentType\": \"email\"}}], \"executorForenames\": \"executor1_forenames\", \"deceasedDateOfBirth\": \"1930-01-01\", \"deceasedDateOfDeath\": \"2017-01-01\", \"deceasedTypeOfDeath\": \"diedOnOrAbout\", \"deceasedEmailAddress\": \"primary@probate-test.com\", \"executorEmailAddress\": \"executor1@probate-test.com\", \"deceasedAnyOtherNames\": \"Yes\", \"additionalExecutorList\": [{\"id\": \"bd8d7ca2-ed84-424c-a241-40fd99b15596\", \"value\": {\"executorTitle\": \"Dr\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor2_surname\", \"executorForenames\": \"executor2_forenames\", \"executorEmailAddress\": \"executor2@probate-test.com\"}}], \"deceasedFullAliasNameList\": [{\"id\": \"1970ac9d-532e-48f2-8851-04ae6eec973f\", \"value\": {\"FullAliasName\": \"deceased_alias1_1633469477956\"}}, {\"id\": \"86c842aa-e10f-44d5-8c28-a7ce8a1cb0eb\", \"value\": {\"FullAliasName\": \"deceased_alias2\"}}]}"
your_json_schema = f.schema_of_json(json_string)
df = df.withColumn(“ce_data”, f.from_json(df.ce_data,schema=your_json_schema)
df = df.filter("ce_data.lodgeDate = '1970-03-03'")
如果结构中有数组。您可以使用 f.explode 方法为每个数组字段创建一个新行。然后从那里你可以像查询任何列一样查询它们
正如您已经提到的,ce_data
是一个 包含 JSON 内容 的字符串,假设 JSON 有效,您可以使用get_json_object 函数提取 JSON 的属性,像这样
spark.sql("""
select ce_data
from testtable
where get_json_object(ce_data, "$.lodgedDate") = "1970-03-03"
""").show()
但是,如果您问我,我会说我更喜欢 Python 语法而不是 SQL 语法。这样干净多了
from pyspark.sql import functions as F
(df
.where(F.get_json_object('ce_data', '$.lodgedDate') == '1970-03-03')
.show()
)
我在 JSON 文件中有数据(只显示第一行)
{
"cd_created_date": "2021-10-05T21:33:39.480933",
"cd_jurisdiction": "PROBATE",
"cd_last_modified": "2021-10-05T21:35:04.061105",
"cd_last_state_modified_date": "2021-10-05T21:35:04.060968",
"cd_latest_state": "WillWithdrawn",
"cd_reference": 1633469619443286,
"cd_security_classification": "PUBLIC",
"cd_version": 7,
"ce_case_data_id": 3483511,
"ce_case_type_id": "WillLodgement",
"ce_case_type_version": 170,
"ce_created_date": "2021-10-05T21:33:51.189872",
"ce_data": "{\"willDate\": \"1950-01-01\", \"jointWill\": \"Yes\", \"lodgedDate\": \"1970-03-03\", \"codicilDate\": \"1962-02-02\", \"executorTitle\": \"Mr\", \"lodgementType\": \"safeCustody\", \"deceasedGender\": \"male\", \"applicationType\": \"Personal\", \"deceasedAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"deceasedSurname\": \"E2E_deceased_surname_1633469477956\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor1_surname\", \"numberOfCodicils\": \"3\", \"registryLocation\": \"Liverpool\", \"deceasedForenames\": \"E2E_deceased_forenames_1633469477956\", \"documentsUploaded\": [{\"id\": \"b1181bfb-d0a7-49d8-8301-b06e58eb42c1\", \"value\": {\"Comment\": \"test file to upload\", \"DocumentLink\": {\"document_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d\", \"document_filename\": \"test_file_for_document_upload.png\", \"document_binary_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d/binary\"}, \"DocumentType\": \"email\"}}], \"executorForenames\": \"executor1_forenames\", \"deceasedDateOfBirth\": \"1930-01-01\", \"deceasedDateOfDeath\": \"2017-01-01\", \"deceasedTypeOfDeath\": \"diedOnOrAbout\", \"deceasedEmailAddress\": \"primary@probate-test.com\", \"executorEmailAddress\": \"executor1@probate-test.com\", \"deceasedAnyOtherNames\": \"Yes\", \"additionalExecutorList\": [{\"id\": \"bd8d7ca2-ed84-424c-a241-40fd99b15596\", \"value\": {\"executorTitle\": \"Dr\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor2_surname\", \"executorForenames\": \"executor2_forenames\", \"executorEmailAddress\": \"executor2@probate-test.com\"}}], \"deceasedFullAliasNameList\": [{\"id\": \"1970ac9d-532e-48f2-8851-04ae6eec973f\", \"value\": {\"FullAliasName\": \"deceased_alias1_1633469477956\"}}, {\"id\": \"86c842aa-e10f-44d5-8c28-a7ce8a1cb0eb\", \"value\": {\"FullAliasName\": \"deceased_alias2\"}}]}",
"ce_description": "upload_document_event_description_text",
"ce_event_id": "uploadDocument",
"ce_event_name": "Upload document",
"ce_id": 30638630,
"ce_security_classification": "PUBLIC",
"ce_state_id": "WillLodgementCreated",
"ce_state_name": "Will lodgement created",
"ce_summary": "upload_document_event_summary_text",
"ce_user_first_name": "Probate",
"ce_user_id": "349978",
"ce_user_last_name": "Backoffice",
"extraction_date": "2021-10-06"
}
因为你可以看到字段 ce_data 包含一个数组。
当我使用 Databricks 在 Apache Spark 中读取 JSON 时,我得到以下 printSchema()
root
|-- cd_created_date: string (nullable = true)
|-- cd_jurisdiction: string (nullable = true)
|-- cd_last_modified: string (nullable = true)
|-- cd_last_state_modified_date: string (nullable = true)
|-- cd_latest_state: string (nullable = true)
|-- cd_reference: long (nullable = true)
|-- cd_security_classification: string (nullable = true)
|-- cd_version: long (nullable = true)
|-- ce_case_data_id: long (nullable = true)
|-- ce_case_type_id: string (nullable = true)
|-- ce_case_type_version: long (nullable = true)
|-- ce_created_date: string (nullable = true)
**|-- ce_data: string (nullable = true)**
|-- ce_description: string (nullable = true)
|-- ce_event_id: string (nullable = true)
|-- ce_event_name: string (nullable = true)
|-- ce_id: long (nullable = true)
|-- ce_security_classification: string (nullable = true)
|-- ce_state_id: string (nullable = true)
|-- ce_state_name: string (nullable = true)
|-- ce_summary: string (nullable = true)
|-- ce_user_first_name: string (nullable = true)
|-- ce_user_id: string (nullable = true)
|-- ce_user_last_name: string (nullable = true)
|-- extraction_date: string (nullable = true)
从上面的 Databricks 中的 PrintSchema 可以看出,字段 ce_data 没有显示为数组。
但是,我想查询ce_data字段中的数组。例如,我想编写一个查询来找到 lodgeDate = 1970-03-03?
我的尝试是这样的
test = spark.sql("""select ce_data from testtable where ce_data.lodgeDate = '1970-03-03'""")
当我在 Databricks 中输入上述代码时出现的错误是:
Can't extract value from ce_data#12747: need struct type but got string;
所以,我首先需要了解为什么我没有在 printSchema() 中看到数组,但是我的主要问题是如何使用 sparkSQL 在 JSON 中查询数组。
我也想知道是否需要导入一些库?
看起来您的字段是 json 字符串而不是数组。您可以使用“from_json”方法将其转换为结构类型。然后你可以用上面的代码查询它。 请注意,您需要 json 的架构才能使用 from_json 方法。例如
from pyspark.sql import functions as f
# get your json schema
json_string = "{\"willDate\": \"1950-01-01\", \"jointWill\": \"Yes\", \"lodgedDate\": \"1970-03-03\", \"codicilDate\": \"1962-02-02\", \"executorTitle\": \"Mr\", \"lodgementType\": \"safeCustody\", \"deceasedGender\": \"male\", \"applicationType\": \"Personal\", \"deceasedAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"deceasedSurname\": \"E2E_deceased_surname_1633469477956\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor1_surname\", \"numberOfCodicils\": \"3\", \"registryLocation\": \"Liverpool\", \"deceasedForenames\": \"E2E_deceased_forenames_1633469477956\", \"documentsUploaded\": [{\"id\": \"b1181bfb-d0a7-49d8-8301-b06e58eb42c1\", \"value\": {\"Comment\": \"test file to upload\", \"DocumentLink\": {\"document_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d\", \"document_filename\": \"test_file_for_document_upload.png\", \"document_binary_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d/binary\"}, \"DocumentType\": \"email\"}}], \"executorForenames\": \"executor1_forenames\", \"deceasedDateOfBirth\": \"1930-01-01\", \"deceasedDateOfDeath\": \"2017-01-01\", \"deceasedTypeOfDeath\": \"diedOnOrAbout\", \"deceasedEmailAddress\": \"primary@probate-test.com\", \"executorEmailAddress\": \"executor1@probate-test.com\", \"deceasedAnyOtherNames\": \"Yes\", \"additionalExecutorList\": [{\"id\": \"bd8d7ca2-ed84-424c-a241-40fd99b15596\", \"value\": {\"executorTitle\": \"Dr\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor2_surname\", \"executorForenames\": \"executor2_forenames\", \"executorEmailAddress\": \"executor2@probate-test.com\"}}], \"deceasedFullAliasNameList\": [{\"id\": \"1970ac9d-532e-48f2-8851-04ae6eec973f\", \"value\": {\"FullAliasName\": \"deceased_alias1_1633469477956\"}}, {\"id\": \"86c842aa-e10f-44d5-8c28-a7ce8a1cb0eb\", \"value\": {\"FullAliasName\": \"deceased_alias2\"}}]}"
your_json_schema = f.schema_of_json(json_string)
df = df.withColumn(“ce_data”, f.from_json(df.ce_data,schema=your_json_schema)
df = df.filter("ce_data.lodgeDate = '1970-03-03'")
如果结构中有数组。您可以使用 f.explode 方法为每个数组字段创建一个新行。然后从那里你可以像查询任何列一样查询它们
正如您已经提到的,ce_data
是一个 包含 JSON 内容 的字符串,假设 JSON 有效,您可以使用get_json_object 函数提取 JSON 的属性,像这样
spark.sql("""
select ce_data
from testtable
where get_json_object(ce_data, "$.lodgedDate") = "1970-03-03"
""").show()
但是,如果您问我,我会说我更喜欢 Python 语法而不是 SQL 语法。这样干净多了
from pyspark.sql import functions as F
(df
.where(F.get_json_object('ce_data', '$.lodgedDate') == '1970-03-03')
.show()
)