如何使用 Databricks 在 Apache Spark 中使用 SparkSQL 或 PySpark 查询 JSON 中的数组

How to query arrays in JSON with either SparkSQL or PySpark in Apache Spark with Databricks

我在 JSON 文件中有数据(只显示第一行)

{
    "cd_created_date": "2021-10-05T21:33:39.480933",
    "cd_jurisdiction": "PROBATE",
    "cd_last_modified": "2021-10-05T21:35:04.061105",
    "cd_last_state_modified_date": "2021-10-05T21:35:04.060968",
    "cd_latest_state": "WillWithdrawn",
    "cd_reference": 1633469619443286,
    "cd_security_classification": "PUBLIC",
    "cd_version": 7,
    "ce_case_data_id": 3483511,
    "ce_case_type_id": "WillLodgement",
    "ce_case_type_version": 170,
    "ce_created_date": "2021-10-05T21:33:51.189872",
    "ce_data": "{\"willDate\": \"1950-01-01\", \"jointWill\": \"Yes\", \"lodgedDate\": \"1970-03-03\", \"codicilDate\": \"1962-02-02\", \"executorTitle\": \"Mr\", \"lodgementType\": \"safeCustody\", \"deceasedGender\": \"male\", \"applicationType\": \"Personal\", \"deceasedAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"deceasedSurname\": \"E2E_deceased_surname_1633469477956\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor1_surname\", \"numberOfCodicils\": \"3\", \"registryLocation\": \"Liverpool\", \"deceasedForenames\": \"E2E_deceased_forenames_1633469477956\", \"documentsUploaded\": [{\"id\": \"b1181bfb-d0a7-49d8-8301-b06e58eb42c1\", \"value\": {\"Comment\": \"test file to upload\", \"DocumentLink\": {\"document_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d\", \"document_filename\": \"test_file_for_document_upload.png\", \"document_binary_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d/binary\"}, \"DocumentType\": \"email\"}}], \"executorForenames\": \"executor1_forenames\", \"deceasedDateOfBirth\": \"1930-01-01\", \"deceasedDateOfDeath\": \"2017-01-01\", \"deceasedTypeOfDeath\": \"diedOnOrAbout\", \"deceasedEmailAddress\": \"primary@probate-test.com\", \"executorEmailAddress\": \"executor1@probate-test.com\", \"deceasedAnyOtherNames\": \"Yes\", \"additionalExecutorList\": [{\"id\": \"bd8d7ca2-ed84-424c-a241-40fd99b15596\", \"value\": {\"executorTitle\": \"Dr\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor2_surname\", \"executorForenames\": \"executor2_forenames\", \"executorEmailAddress\": \"executor2@probate-test.com\"}}], \"deceasedFullAliasNameList\": [{\"id\": \"1970ac9d-532e-48f2-8851-04ae6eec973f\", \"value\": {\"FullAliasName\": \"deceased_alias1_1633469477956\"}}, {\"id\": \"86c842aa-e10f-44d5-8c28-a7ce8a1cb0eb\", \"value\": {\"FullAliasName\": \"deceased_alias2\"}}]}",
    "ce_description": "upload_document_event_description_text",
    "ce_event_id": "uploadDocument",
    "ce_event_name": "Upload document",
    "ce_id": 30638630,
    "ce_security_classification": "PUBLIC",
    "ce_state_id": "WillLodgementCreated",
    "ce_state_name": "Will lodgement created",
    "ce_summary": "upload_document_event_summary_text",
    "ce_user_first_name": "Probate",
    "ce_user_id": "349978",
    "ce_user_last_name": "Backoffice",
    "extraction_date": "2021-10-06"
}

因为你可以看到字段 ce_data 包含一个数组。

当我使用 Databricks 在 Apache Spark 中读取 JSON 时,我得到以下 printSchema()

root
 |-- cd_created_date: string (nullable = true)
 |-- cd_jurisdiction: string (nullable = true)
 |-- cd_last_modified: string (nullable = true)
 |-- cd_last_state_modified_date: string (nullable = true)
 |-- cd_latest_state: string (nullable = true)
 |-- cd_reference: long (nullable = true)
 |-- cd_security_classification: string (nullable = true)
 |-- cd_version: long (nullable = true)
 |-- ce_case_data_id: long (nullable = true)
 |-- ce_case_type_id: string (nullable = true)
 |-- ce_case_type_version: long (nullable = true)
 |-- ce_created_date: string (nullable = true)
 **|-- ce_data: string (nullable = true)**
 |-- ce_description: string (nullable = true)
 |-- ce_event_id: string (nullable = true)
 |-- ce_event_name: string (nullable = true)
 |-- ce_id: long (nullable = true)
 |-- ce_security_classification: string (nullable = true)
 |-- ce_state_id: string (nullable = true)
 |-- ce_state_name: string (nullable = true)
 |-- ce_summary: string (nullable = true)
 |-- ce_user_first_name: string (nullable = true)
 |-- ce_user_id: string (nullable = true)
 |-- ce_user_last_name: string (nullable = true)
 |-- extraction_date: string (nullable = true)

从上面的 Databricks 中的 PrintSchema 可以看出,字段 ce_data 没有显示为数组。

但是,我想查询ce_data字段中的数组。例如,我想编写一个查询来找到 lodgeDate = 1970-03-03?

我的尝试是这样的

test = spark.sql("""select ce_data from testtable where ce_data.lodgeDate = '1970-03-03'""")

当我在 Databricks 中输入上述代码时出现的错误是:

Can't extract value from ce_data#12747: need struct type but got string;

所以,我首先需要了解为什么我没有在 printSchema() 中看到数组,但是我的主要问题是如何使用 sparkSQL 在 JSON 中查询数组。

我也想知道是否需要导入一些库?

看起来您的字段是 json 字符串而不是数组。您可以使用“from_json”方法将其转换为结构类型。然后你可以用上面的代码查询它。 请注意,您需要 json 的架构才能使用 from_json 方法。例如

  from pyspark.sql import functions as f
  # get your json schema
  json_string = "{\"willDate\": \"1950-01-01\", \"jointWill\": \"Yes\", \"lodgedDate\": \"1970-03-03\", \"codicilDate\": \"1962-02-02\", \"executorTitle\": \"Mr\", \"lodgementType\": \"safeCustody\", \"deceasedGender\": \"male\", \"applicationType\": \"Personal\", \"deceasedAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"deceasedSurname\": \"E2E_deceased_surname_1633469477956\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor1_surname\", \"numberOfCodicils\": \"3\", \"registryLocation\": \"Liverpool\", \"deceasedForenames\": \"E2E_deceased_forenames_1633469477956\", \"documentsUploaded\": [{\"id\": \"b1181bfb-d0a7-49d8-8301-b06e58eb42c1\", \"value\": {\"Comment\": \"test file to upload\", \"DocumentLink\": {\"document_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d\", \"document_filename\": \"test_file_for_document_upload.png\", \"document_binary_url\": \"http://dm-store-aat.service.core-compute-aat.internal/documents/60cd0a78-648e-4af7-9f83-15a380f1786d/binary\"}, \"DocumentType\": \"email\"}}], \"executorForenames\": \"executor1_forenames\", \"deceasedDateOfBirth\": \"1930-01-01\", \"deceasedDateOfDeath\": \"2017-01-01\", \"deceasedTypeOfDeath\": \"diedOnOrAbout\", \"deceasedEmailAddress\": \"primary@probate-test.com\", \"executorEmailAddress\": \"executor1@probate-test.com\", \"deceasedAnyOtherNames\": \"Yes\", \"additionalExecutorList\": [{\"id\": \"bd8d7ca2-ed84-424c-a241-40fd99b15596\", \"value\": {\"executorTitle\": \"Dr\", \"executorAddress\": {\"County\": \"London\", \"Country\": \"United Kingdom\", \"PostCode\": \"SW1A 1AA\", \"PostTown\": \"London\", \"AddressLine1\": \"1\", \"AddressLine2\": \"Buckingham Palace\", \"AddressLine3\": \"The place to be\"}, \"executorSurname\": \"executor2_surname\", \"executorForenames\": \"executor2_forenames\", \"executorEmailAddress\": \"executor2@probate-test.com\"}}], \"deceasedFullAliasNameList\": [{\"id\": \"1970ac9d-532e-48f2-8851-04ae6eec973f\", \"value\": {\"FullAliasName\": \"deceased_alias1_1633469477956\"}}, {\"id\": \"86c842aa-e10f-44d5-8c28-a7ce8a1cb0eb\", \"value\": {\"FullAliasName\": \"deceased_alias2\"}}]}"
  your_json_schema = f.schema_of_json(json_string)
  df = df.withColumn(“ce_data”,   f.from_json(df.ce_data,schema=your_json_schema)
   
   df = df.filter("ce_data.lodgeDate = '1970-03-03'")

如果结构中有数组。您可以使用 f.explode 方法为每个数组字段创建一个新行。然后从那里你可以像查询任何列一样查询它们

正如您已经提到的,ce_data 是一个 包含 JSON 内容 的字符串,假设 JSON 有效,您可以使用get_json_object 函数提取 JSON 的属性,像这样

spark.sql("""
select ce_data
from testtable
where get_json_object(ce_data, "$.lodgedDate") = "1970-03-03"
""").show()

但是,如果您问我,我会说我更喜欢 Python 语法而不是 SQL 语法。这样干净多了

from pyspark.sql import functions as F
(df
    .where(F.get_json_object('ce_data', '$.lodgedDate') == '1970-03-03')
    .show()
)