Spark Streaming:DirectStream RDD 到数据帧
SparkStreaming: DirectStream RDD to dataframe
我正在研究 spark streaming context,它在 avro 序列化中从 kafka 主题获取数据,如下所示。
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"schema.registry.url" -> "http://localhost:8081",
"key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"group.id" -> "1"
)
我正在使用 Kafka utils 创建 Direct 流,如下所示
val topics = Set("mysql-foobar")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String,String](
topics,
kafkaParams)
)
我也将数据写入控制台
stream.foreachRDD ( rdd => {
rdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val next = iterator.next()
println(next.value())
}
})
})
现在我想从这些 RDD 创建数据框。有没有可能我已经审查并测试了来自 Whosebug 的许多解决方案,但遇到了一些问题。 Whosebug 解决方案也是 this and 。
我的输出如下所示
{"c1": 4, "c2": "Jarry", "create_ts": 1536758512000, "update_ts": 1537204805000}
由于您使用的是 Confluent 序列化程序,并且它们目前不提供与 Spark 的简单集成,您可以在 Github 上查看 AbsaOSS 上的一个相对较新的库,它可以帮助解决这个问题。
但基本上,您使用 Spark Structured Streaming 来获取 DataFrames,不要尝试使用 Dstream 到 RDD 到 Dataframe...
你可以找到examples of what you're looking for here
另请参阅
中的其他示例
我正在研究 spark streaming context,它在 avro 序列化中从 kafka 主题获取数据,如下所示。
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"schema.registry.url" -> "http://localhost:8081",
"key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"group.id" -> "1"
)
我正在使用 Kafka utils 创建 Direct 流,如下所示
val topics = Set("mysql-foobar")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String,String](
topics,
kafkaParams)
)
我也将数据写入控制台
stream.foreachRDD ( rdd => {
rdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val next = iterator.next()
println(next.value())
}
})
})
现在我想从这些 RDD 创建数据框。有没有可能我已经审查并测试了来自 Whosebug 的许多解决方案,但遇到了一些问题。 Whosebug 解决方案也是 this and
{"c1": 4, "c2": "Jarry", "create_ts": 1536758512000, "update_ts": 1537204805000}
由于您使用的是 Confluent 序列化程序,并且它们目前不提供与 Spark 的简单集成,您可以在 Github 上查看 AbsaOSS 上的一个相对较新的库,它可以帮助解决这个问题。
但基本上,您使用 Spark Structured Streaming 来获取 DataFrames,不要尝试使用 Dstream 到 RDD 到 Dataframe...
你可以找到examples of what you're looking for here
另请参阅