在 Spark Streaming 进程中消费 Kafka DStream
Consuming Kafka DStream in Spark Streaming Procss
我正在像这样的 spark 流程序中使用 Kafka 主题:
import ...
object KafkaStreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val kafkaConf = Map(
...
)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Seq("topic"), kafkaConf)
)
val lines: DStream[String] = messages.map(_.value)
val line: DStream[String] = lines.flatMap(_.split("\n"))
process(line)
ssc.start()
ssc.awaitTermination()
}
def process(line: DStream[String]): Unit =
{
// here is where I want to convert the DStream to JSON
var json: Option[Any] = JSON.parseFull(line) // <--
println(json.getOrElse("json is NULL"))
if(json.isEmpty == false) {
println("NOT FALSE")
var map = json.get.asInstanceOf[Map[String, Any]]
// use every member of JSON document to access the value
map.get("any json element").toString
// do some other manipulation
}
}
}
在 process
函数中,我想操作字符串的每一行以从中提取一个 JSON 对象并执行进一步的处理和持久化。我该怎么做?
可以使用 DStream.map
而不是 DStream[String]
,然后使用 foreachRDD
:
def process(line: String): Unit = ???
然后:
messages
.map(_.value)
.flatMap(_.split("\n"))
.map(process)
.foreachRDD { rdd =>
rdd.foreachPartition { itr =>
// Do stuff with `Iterator[String]` after JSON transformation
}
}
我正在像这样的 spark 流程序中使用 Kafka 主题:
import ...
object KafkaStreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val kafkaConf = Map(
...
)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Seq("topic"), kafkaConf)
)
val lines: DStream[String] = messages.map(_.value)
val line: DStream[String] = lines.flatMap(_.split("\n"))
process(line)
ssc.start()
ssc.awaitTermination()
}
def process(line: DStream[String]): Unit =
{
// here is where I want to convert the DStream to JSON
var json: Option[Any] = JSON.parseFull(line) // <--
println(json.getOrElse("json is NULL"))
if(json.isEmpty == false) {
println("NOT FALSE")
var map = json.get.asInstanceOf[Map[String, Any]]
// use every member of JSON document to access the value
map.get("any json element").toString
// do some other manipulation
}
}
}
在 process
函数中,我想操作字符串的每一行以从中提取一个 JSON 对象并执行进一步的处理和持久化。我该怎么做?
可以使用 DStream.map
而不是 DStream[String]
,然后使用 foreachRDD
:
def process(line: String): Unit = ???
然后:
messages
.map(_.value)
.flatMap(_.split("\n"))
.map(process)
.foreachRDD { rdd =>
rdd.foreachPartition { itr =>
// Do stuff with `Iterator[String]` after JSON transformation
}
}