如何使延迟加载 Apache Spark Dataframe 连接到 REST API
How to make a lazy loading Apache Spark Dataframe connected to a REST API
我很高兴 Spark 允许我与数据库 JDBC 建立连接 table,然后在其上构建转换,直到触发评估。我想用 REST API 连接做同样的事情。理论上,这将提供一种在逻辑视图中集成 DB 和 API 信息的方法。是否可以将 Spark 数据帧绑定到自定义函数,该函数使用延迟评估生成的参数调用 API?
这里有一些可以玩的 pySpark 代码:
import findspark, json, requests
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("basic test") \
.getOrCreate()
url = 'http://worldclockapi.com/api/json/utc/now'
headers = {"accept": "application/json;charset=UTF-8"}
results = requests.get(url, headers=headers)
obj = json.loads(results.text)
mydict = {k:v for k,v in obj.items() if v is not None}
df = spark.createDataFrame([mydict])
df.show()
在此示例中,API 调用不是惰性调用,也不是由 show() 触发的。
我知道通过 pySpark API 这很容易是不可能的。可以在 Scala 中完成吗?有没有可以在 Spark 中做到这一点的软件包?
可能与
有关
延迟调用 REST API 是可能的,但你需要将它放在 map
函数中(在处理 RDD 时)或 UDF 中(在 Dataframe API 中):
>>> from pyspark.sql import Row
>>> from pyspark.sql.functions import *
>>> import requests
>>>
>>> urls = [Row(url='http://worldclockapi.com/api/json/utc/now')] * 10
>>> call_time_api = lambda url: requests.get(url).json()['currentFileTime']
>>>
>>> spark.createDataFrame(urls) \
... .withColumn('time', udf(call_time_api)('url')) \
... .show(truncate=False)
+-----------------------------------------+------------------+
|url |time |
+-----------------------------------------+------------------+
|http://worldclockapi.com/api/json/utc/now|131879608910925580|
|http://worldclockapi.com/api/json/utc/now|131879608911081830|
|http://worldclockapi.com/api/json/utc/now|131879608911238454|
|http://worldclockapi.com/api/json/utc/now|131879608911550881|
|http://worldclockapi.com/api/json/utc/now|131879608911706855|
|http://worldclockapi.com/api/json/utc/now|131879608911706855|
|http://worldclockapi.com/api/json/utc/now|131879608911863229|
|http://worldclockapi.com/api/json/utc/now|131879608912019732|
|http://worldclockapi.com/api/json/utc/now|131879608912175607|
|http://worldclockapi.com/api/json/utc/now|131879608912175607|
+-----------------------------------------+------------------+
事实上,这对于通过分页废弃 API 结果非常有用 - 首先您创建一个 URL 数组(每个 URL 用于不同的结果页面),然后您可以在 spark 上下文中并行检索数据并且创建一个结果数据框。
我很高兴 Spark 允许我与数据库 JDBC 建立连接 table,然后在其上构建转换,直到触发评估。我想用 REST API 连接做同样的事情。理论上,这将提供一种在逻辑视图中集成 DB 和 API 信息的方法。是否可以将 Spark 数据帧绑定到自定义函数,该函数使用延迟评估生成的参数调用 API?
这里有一些可以玩的 pySpark 代码:
import findspark, json, requests
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("basic test") \
.getOrCreate()
url = 'http://worldclockapi.com/api/json/utc/now'
headers = {"accept": "application/json;charset=UTF-8"}
results = requests.get(url, headers=headers)
obj = json.loads(results.text)
mydict = {k:v for k,v in obj.items() if v is not None}
df = spark.createDataFrame([mydict])
df.show()
在此示例中,API 调用不是惰性调用,也不是由 show() 触发的。
我知道通过 pySpark API 这很容易是不可能的。可以在 Scala 中完成吗?有没有可以在 Spark 中做到这一点的软件包?
可能与
延迟调用 REST API 是可能的,但你需要将它放在 map
函数中(在处理 RDD 时)或 UDF 中(在 Dataframe API 中):
>>> from pyspark.sql import Row
>>> from pyspark.sql.functions import *
>>> import requests
>>>
>>> urls = [Row(url='http://worldclockapi.com/api/json/utc/now')] * 10
>>> call_time_api = lambda url: requests.get(url).json()['currentFileTime']
>>>
>>> spark.createDataFrame(urls) \
... .withColumn('time', udf(call_time_api)('url')) \
... .show(truncate=False)
+-----------------------------------------+------------------+
|url |time |
+-----------------------------------------+------------------+
|http://worldclockapi.com/api/json/utc/now|131879608910925580|
|http://worldclockapi.com/api/json/utc/now|131879608911081830|
|http://worldclockapi.com/api/json/utc/now|131879608911238454|
|http://worldclockapi.com/api/json/utc/now|131879608911550881|
|http://worldclockapi.com/api/json/utc/now|131879608911706855|
|http://worldclockapi.com/api/json/utc/now|131879608911706855|
|http://worldclockapi.com/api/json/utc/now|131879608911863229|
|http://worldclockapi.com/api/json/utc/now|131879608912019732|
|http://worldclockapi.com/api/json/utc/now|131879608912175607|
|http://worldclockapi.com/api/json/utc/now|131879608912175607|
+-----------------------------------------+------------------+
事实上,这对于通过分页废弃 API 结果非常有用 - 首先您创建一个 URL 数组(每个 URL 用于不同的结果页面),然后您可以在 spark 上下文中并行检索数据并且创建一个结果数据框。