Spark SQL:如何使用来自 REST 服务的 json 数据作为 DataFrame
Spark SQL: How to consume json data from a REST service as DataFrame
我需要从提供 REST 接口的 Web 服务中读取一些 JSON 数据,以便从我的 SPARK SQL 代码中查询数据以进行分析。我能够读取存储在 blob 存储中的 JSON 并使用它。
我想知道从 REST 服务读取数据并像使用任何其他服务一样使用它的最佳方法是什么 DataFrame
。
顺便说一句,如果有帮助,我正在使用 SPARK 1.6 of Linux cluster on HD insight
。如果有人可以分享任何代码片段,我将不胜感激,因为我对 SPARK 环境仍然很陌生。
在 Spark 1.6 上:
如果您在 Python,请使用 requests library to get the information and then just create an RDD from it. There must be some similar library for Scala (relevant thread)。
然后就这样做:
json_str = '{"executorCores": 2, "kind": "pyspark", "driverMemory": 1000}'
rdd = sc.parallelize([json_str])
json_df = sqlContext.jsonRDD(rdd)
json_df
Scala 代码:
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
本文来自:
http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
Spark 无法将任意 json 解析为数据帧,因为 json 是层次结构,而数据帧是扁平的。如果您的 json 不是由 spark 创建的,它很可能不符合 condition "Each line must contain a separate, self-contained valid JSON object" 因此需要使用您的自定义代码进行解析,然后作为case-class 对象或 spark sql 行。
你可以这样下载:
import scalaj.http._
val response = Http("proto:///path/to/json")
.header("key", "val").method("get")
.execute().asString.body
然后将您的 json 解析为 shown in this answer。然后创建一个你的案例的对象序列-class(比如seq)并创建一个数据框为
seq.toDF
给你:- spark 2.2
import org.apache.spark.sql._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
object SparkRestApi {
def main(args: Array[String]): Unit = {
val logger = Logger.getLogger("blah")
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName("blah")
.config("spark.sql.warehouse.dir", "C:\Temp\hive")
.master("local[2]")
//.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val url = "https://api.github.com/users/hadley/orgs"
val result2 = List(scala.io.Source.fromURL(url).mkString)
val githubRdd2=spark.sparkContext.makeRDD(result2)
val gitHubDF2=spark.read.json(githubRdd2)
println(gitHubDF2)
gitHubDF2.show()
spark.stop()
}
}
我需要从提供 REST 接口的 Web 服务中读取一些 JSON 数据,以便从我的 SPARK SQL 代码中查询数据以进行分析。我能够读取存储在 blob 存储中的 JSON 并使用它。
我想知道从 REST 服务读取数据并像使用任何其他服务一样使用它的最佳方法是什么 DataFrame
。
顺便说一句,如果有帮助,我正在使用 SPARK 1.6 of Linux cluster on HD insight
。如果有人可以分享任何代码片段,我将不胜感激,因为我对 SPARK 环境仍然很陌生。
在 Spark 1.6 上:
如果您在 Python,请使用 requests library to get the information and then just create an RDD from it. There must be some similar library for Scala (relevant thread)。 然后就这样做:
json_str = '{"executorCores": 2, "kind": "pyspark", "driverMemory": 1000}'
rdd = sc.parallelize([json_str])
json_df = sqlContext.jsonRDD(rdd)
json_df
Scala 代码:
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
本文来自: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
Spark 无法将任意 json 解析为数据帧,因为 json 是层次结构,而数据帧是扁平的。如果您的 json 不是由 spark 创建的,它很可能不符合 condition "Each line must contain a separate, self-contained valid JSON object" 因此需要使用您的自定义代码进行解析,然后作为case-class 对象或 spark sql 行。
你可以这样下载:
import scalaj.http._
val response = Http("proto:///path/to/json")
.header("key", "val").method("get")
.execute().asString.body
然后将您的 json 解析为 shown in this answer。然后创建一个你的案例的对象序列-class(比如seq)并创建一个数据框为
seq.toDF
给你:- spark 2.2
import org.apache.spark.sql._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
object SparkRestApi {
def main(args: Array[String]): Unit = {
val logger = Logger.getLogger("blah")
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName("blah")
.config("spark.sql.warehouse.dir", "C:\Temp\hive")
.master("local[2]")
//.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val url = "https://api.github.com/users/hadley/orgs"
val result2 = List(scala.io.Source.fromURL(url).mkString)
val githubRdd2=spark.sparkContext.makeRDD(result2)
val gitHubDF2=spark.read.json(githubRdd2)
println(gitHubDF2)
gitHubDF2.show()
spark.stop()
}
}