如何使用 `ssc.fileStream()` 读取镶木地板文件?传递给 `ssc.fileStream()` 的类型是什么?
How to read parquet files using `ssc.fileStream()`? What are the types passed to `ssc.fileStream()`?
我对Spark的fileStream()
方法的理解是,它接受三种类型作为参数:Key
、Value
和Format
。对于文本文件,适当的类型为:LongWritable
、Text
和 TextInputFormat
.
首先,想了解一下这几种类型的本质。凭直觉,我猜本例中的 Key
是文件的行号,而 Value
是该行的文本。因此,在以下文本文件示例中:
Hello
Test
Another Test
DStream
的第一行将有 1
的 Key
(0
?)和 Hello
的 Value
.
这是正确的吗?
我的问题的第二部分:我查看了 ParquetInputFormat
的反编译实现,我发现了一些奇怪的东西:
public class ParquetInputFormat<T>
extends FileInputFormat<Void, T> {
//...
public class TextInputFormat
extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {
//...
TextInputFormat
扩展 LongWritable
和 Text
类型的 FileInputFormat
,而 ParquetInputFormat
扩展相同的 class 类型 Void
和 T
.
这是否意味着我必须创建一个 Value
class 来保存整行我的 parquet 数据,然后将类型 <Void, MyClass, ParquetInputFormat<MyClass>>
传递给 ssc.fileStream()
?
如果可以,我该如何实现MyClass
?
编辑 1:我注意到一个 readSupportClass
将被传递给 ParquetInputFormat
对象。这是什么class,它是如何用来解析parquet文件的?是否有一些文档涵盖了这一点?
编辑 2:据我所知,这是不可能。如果有人知道如何将镶木地板文件流式传输到 Spark,请随时分享...
我在 Spark Streaming 中读取镶木地板文件的示例如下。
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)
val lines = stream.map(row => {
println("row:" + row.toString())
row
})
有些点是...
- 记录类型为 GenericRecord
- readSupportClass 是 AvroReadSupport
- 将配置传递给 fileStream
- 将parquet.read.support.class设置为配置
我参考了下面的源代码来创建示例。
而且我也找不到很好的例子。
我想等更好的。
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
您可以通过添加一些 parquet
特定 hadoop
设置来访问镶木地板:
val ssc = new StreamingContext(conf, Seconds(5))
var schema =StructType(Seq(
StructField("a", StringType, nullable = false),
........
))
val schemaJson=schema.json
val fileDir="/tmp/fileDir"
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport") ssc.sparkContext.hadoopConfiguration.set("org.apache.spark.sql.parquet.row.requested_schema", schemaJson)
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")
val streamRdd = ssc.fileStream[Void, UnsafeRow, ParquetInputFormat[UnsafeRow]](fileDir,(t: org.apache.hadoop.fs.Path) => true, false)
streamRdd.count().print()
ssc.start()
ssc.awaitTermination()
此代码是使用 Spark 2.1.0
编写的。
我对Spark的fileStream()
方法的理解是,它接受三种类型作为参数:Key
、Value
和Format
。对于文本文件,适当的类型为:LongWritable
、Text
和 TextInputFormat
.
首先,想了解一下这几种类型的本质。凭直觉,我猜本例中的 Key
是文件的行号,而 Value
是该行的文本。因此,在以下文本文件示例中:
Hello
Test
Another Test
DStream
的第一行将有 1
的 Key
(0
?)和 Hello
的 Value
.
这是正确的吗?
我的问题的第二部分:我查看了 ParquetInputFormat
的反编译实现,我发现了一些奇怪的东西:
public class ParquetInputFormat<T>
extends FileInputFormat<Void, T> {
//...
public class TextInputFormat
extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {
//...
TextInputFormat
扩展 LongWritable
和 Text
类型的 FileInputFormat
,而 ParquetInputFormat
扩展相同的 class 类型 Void
和 T
.
这是否意味着我必须创建一个 Value
class 来保存整行我的 parquet 数据,然后将类型 <Void, MyClass, ParquetInputFormat<MyClass>>
传递给 ssc.fileStream()
?
如果可以,我该如何实现MyClass
?
编辑 1:我注意到一个 readSupportClass
将被传递给 ParquetInputFormat
对象。这是什么class,它是如何用来解析parquet文件的?是否有一些文档涵盖了这一点?
编辑 2:据我所知,这是不可能。如果有人知道如何将镶木地板文件流式传输到 Spark,请随时分享...
我在 Spark Streaming 中读取镶木地板文件的示例如下。
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)
val lines = stream.map(row => {
println("row:" + row.toString())
row
})
有些点是...
- 记录类型为 GenericRecord
- readSupportClass 是 AvroReadSupport
- 将配置传递给 fileStream
- 将parquet.read.support.class设置为配置
我参考了下面的源代码来创建示例。
而且我也找不到很好的例子。
我想等更好的。
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
您可以通过添加一些 parquet
特定 hadoop
设置来访问镶木地板:
val ssc = new StreamingContext(conf, Seconds(5))
var schema =StructType(Seq(
StructField("a", StringType, nullable = false),
........
))
val schemaJson=schema.json
val fileDir="/tmp/fileDir"
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport") ssc.sparkContext.hadoopConfiguration.set("org.apache.spark.sql.parquet.row.requested_schema", schemaJson)
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")
val streamRdd = ssc.fileStream[Void, UnsafeRow, ParquetInputFormat[UnsafeRow]](fileDir,(t: org.apache.hadoop.fs.Path) => true, false)
streamRdd.count().print()
ssc.start()
ssc.awaitTermination()
此代码是使用 Spark 2.1.0
编写的。