Spark SQL:如何使用来自 REST 服务的 json 数据作为 DataFrame

Spark SQL: How to consume json data from a REST service as DataFrame

我需要从提供 REST 接口的 Web 服务中读取一些 JSON 数据,以便从我的 SPARK SQL 代码中查询数据以进行分析。我能够读取存储在 blob 存储中的 JSON 并使用它。

我想知道从 REST 服务读取数据并像使用任何其他服务一样使用它的最佳方法是什么 DataFrame

顺便说一句,如果有帮助,我正在使用 SPARK 1.6 of Linux cluster on HD insight。如果有人可以分享任何代码片段,我将不胜感激,因为我对 SPARK 环境仍然很陌生。

在 Spark 1.6 上:

如果您在 Python,请使用 requests library to get the information and then just create an RDD from it. There must be some similar library for Scala (relevant thread)。 然后就这样做:

json_str = '{"executorCores": 2, "kind": "pyspark", "driverMemory": 1000}'
rdd = sc.parallelize([json_str])
json_df = sqlContext.jsonRDD(rdd)
json_df

Scala 代码:

val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

本文来自: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

Spark 无法将任意 json 解析为数据帧,因为 json 是层次结构,而数据帧是扁平的。如果您的 json 不是由 spark 创建的,它很可能不符合 condition "Each line must contain a separate, self-contained valid JSON object" 因此需要使用您的自定义代码进行解析,然后作为case-class 对象或 spark sql 行。

你可以这样下载:

import scalaj.http._
val response = Http("proto:///path/to/json")
  .header("key", "val").method("get")
  .execute().asString.body

然后将您的 json 解析为 shown in this answer。然后创建一个你的案例的对象序列-class(比如seq)并创建一个数据框为

seq.toDF

给你:- spark 2.2


import org.apache.spark.sql._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel

object SparkRestApi {
  def main(args: Array[String]): Unit = {

    val logger = Logger.getLogger("blah")
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val spark = SparkSession.builder()
      .appName("blah")
      .config("spark.sql.warehouse.dir", "C:\Temp\hive")
      .master("local[2]")
      //.enableHiveSupport()
      .getOrCreate()
    import spark.implicits._

    val url = "https://api.github.com/users/hadley/orgs"
    val result2 = List(scala.io.Source.fromURL(url).mkString)
    val githubRdd2=spark.sparkContext.makeRDD(result2)
    val gitHubDF2=spark.read.json(githubRdd2)
    println(gitHubDF2)
    gitHubDF2.show()

    spark.stop()
  }
}