如何使用 `ssc.fileStream()` 读取镶木地板文件?传递给 `ssc.fileStream()` 的类型是什么?

How to read parquet files using `ssc.fileStream()`? What are the types passed to `ssc.fileStream()`?

我对Spark的fileStream()方法的理解是,它接受三种类型作为参数:KeyValueFormat。对于文本文件,适当的类型为:LongWritableTextTextInputFormat.

首先,想了解一下这几种类型的本质。凭直觉,我猜本例中的 Key 是文件的行号,而 Value 是该行的文本。因此,在以下文本文件示例中:

Hello
Test
Another Test

DStream 的第一行将有 1Key0?)和 HelloValue .

这是正确的吗?


我的问题的第二部分:我查看了 ParquetInputFormat 的反编译实现,我发现了一些奇怪的东西:

public class ParquetInputFormat<T>
       extends FileInputFormat<Void, T> {
//...

public class TextInputFormat
       extends FileInputFormat<LongWritable, Text>
       implements JobConfigurable {
//...

TextInputFormat 扩展 LongWritableText 类型的 FileInputFormat,而 ParquetInputFormat 扩展相同的 class 类型 VoidT.

这是否意味着我必须创建一个 Value class 来保存整行我的 parquet 数据,然后将类型 <Void, MyClass, ParquetInputFormat<MyClass>> 传递给 ssc.fileStream()

如果可以,我该如何实现MyClass


编辑 1:我注意到一个 readSupportClass 将被传递给 ParquetInputFormat 对象。这是什么class,它是如何用来解析parquet文件的?是否有一些文档涵盖了这一点?


编辑 2:据我所知,这是不可能。如果有人知道如何将镶木地板文件流式传输到 Spark,请随时分享...

我在 Spark Streaming 中读取镶木地板文件的示例如下。

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
  directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)

val lines = stream.map(row => {
  println("row:" + row.toString())
  row
})

有些点是...

  • 记录类型为 GenericRecord
  • readSupportClass 是 AvroReadSupport
  • 将配置传递给 fileStream
  • 将parquet.read.support.class设置为配置

我参考了下面的源代码来创建示例。
而且我也找不到很好的例子。
我想等更好的。

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

您可以通过添加一些 parquet 特定 hadoop 设置来访问镶木地板:

val ssc = new StreamingContext(conf, Seconds(5))
var schema =StructType(Seq(
      StructField("a", StringType, nullable = false),
      ........

     ))
val schemaJson=schema.json

val fileDir="/tmp/fileDir"
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport")  ssc.sparkContext.hadoopConfiguration.set("org.apache.spark.sql.parquet.row.requested_schema", schemaJson)
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")

val streamRdd = ssc.fileStream[Void, UnsafeRow, ParquetInputFormat[UnsafeRow]](fileDir,(t: org.apache.hadoop.fs.Path) => true, false)

streamRdd.count().print()

ssc.start()
ssc.awaitTermination()

此代码是使用 Spark 2.1.0 编写的。