Neo4j Spark Streaming Scala 的 NotSerializableException
NotSerializableException with Neo4j Spark Streaming Scala
我正在尝试 运行 使用 Neo4j-Spark 连接器在 Neo4j 中进行查询。我想将流中的值(由 Kafka 作为字符串生成)传递到我的查询中。但是,我得到序列化异常:
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@54688d9f)
- field (class: consumer.SparkConsumer$$anonfun$processingLogic, name: sc, type: class org.apache.spark.SparkContext)
- object (class consumer.SparkConsumer$$anonfun$processingLogic, <function1>)
- field (class: consumer.SparkConsumer$$anonfun$processingLogic$$anonfun$apply, name: $outer, type: class consumer.SparkConsumer$$anonfun$processingLogic)
- object (class consumer.SparkConsumer$$anonfun$processingLogic$$anonfun$apply, <function1>)
这里是主要功能和查询逻辑的代码:
object SparkConsumer {
def main(args: Array[String]) {
val config = "neo4j_local"
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
setNeo4jSparkConfig(config, sparkConf)
val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
streamingContext.sparkContext.setLogLevel("ERROR")
val sqlContext = new SQLContext(streamingContext.sparkContext)
val numStreams = 2
val topics = Array("member_topic1")
def kafkaParams(i: Int) = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group2",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val lines = (1 to numStreams).map(i => KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams(i))
))
val messages = streamingContext.union(lines)
val wordsArrays = values.map(_.split(","))
wordsArrays.foreachRDD(rdd => rdd.foreach(
data => execNeo4jSearchQuery(data)(streamingContext.sparkContext)
))
streamingContext.start()
streamingContext.awaitTermination()
}
def execNeo4jSearchQuery(data: Array[String])(implicit sc: SparkContext) = {
val neo = Neo4j(sc)
val query = "my query"
val paramsMap = Map("lat" -> data(1).toDouble, "lon" -> data(2).toDouble, "id" -> data(0).toInt)
val df = neo.cypher(query, paramsMap).loadDataFrame("group_name" -> "string", "event_name" -> "string", "venue_name" -> "string", "distance" -> "double")
println("\ndf:")
df.show()
}
}
不允许从执行程序访问 SparkContext
、SparkSession
或创建分布式数据结构。因此:
wordsArrays.foreachRDD(rdd => rdd.foreach(
data => execNeo4jSearchQuery(data)(streamingContext.sparkContext)
))
其中 execNeo4jSearchQuery
调用:
neo.cypher(query, paramsMap).loadDataFrame
不是有效的 Spark 代码。
如果您想直接从 RDD.foreach
访问 Neo4j,您必须使用标准客户端(AnormCypher 似乎提供了非常优雅的 API),而无需转换为 Spark 分布式结构。
一点不相关的笔记 - 你 might consider using a single connection for the set of records with foreachPartition
(also ).
我正在尝试 运行 使用 Neo4j-Spark 连接器在 Neo4j 中进行查询。我想将流中的值(由 Kafka 作为字符串生成)传递到我的查询中。但是,我得到序列化异常:
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@54688d9f)
- field (class: consumer.SparkConsumer$$anonfun$processingLogic, name: sc, type: class org.apache.spark.SparkContext)
- object (class consumer.SparkConsumer$$anonfun$processingLogic, <function1>)
- field (class: consumer.SparkConsumer$$anonfun$processingLogic$$anonfun$apply, name: $outer, type: class consumer.SparkConsumer$$anonfun$processingLogic)
- object (class consumer.SparkConsumer$$anonfun$processingLogic$$anonfun$apply, <function1>)
这里是主要功能和查询逻辑的代码:
object SparkConsumer {
def main(args: Array[String]) {
val config = "neo4j_local"
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
setNeo4jSparkConfig(config, sparkConf)
val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
streamingContext.sparkContext.setLogLevel("ERROR")
val sqlContext = new SQLContext(streamingContext.sparkContext)
val numStreams = 2
val topics = Array("member_topic1")
def kafkaParams(i: Int) = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group2",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val lines = (1 to numStreams).map(i => KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams(i))
))
val messages = streamingContext.union(lines)
val wordsArrays = values.map(_.split(","))
wordsArrays.foreachRDD(rdd => rdd.foreach(
data => execNeo4jSearchQuery(data)(streamingContext.sparkContext)
))
streamingContext.start()
streamingContext.awaitTermination()
}
def execNeo4jSearchQuery(data: Array[String])(implicit sc: SparkContext) = {
val neo = Neo4j(sc)
val query = "my query"
val paramsMap = Map("lat" -> data(1).toDouble, "lon" -> data(2).toDouble, "id" -> data(0).toInt)
val df = neo.cypher(query, paramsMap).loadDataFrame("group_name" -> "string", "event_name" -> "string", "venue_name" -> "string", "distance" -> "double")
println("\ndf:")
df.show()
}
}
不允许从执行程序访问 SparkContext
、SparkSession
或创建分布式数据结构。因此:
wordsArrays.foreachRDD(rdd => rdd.foreach(
data => execNeo4jSearchQuery(data)(streamingContext.sparkContext)
))
其中 execNeo4jSearchQuery
调用:
neo.cypher(query, paramsMap).loadDataFrame
不是有效的 Spark 代码。
如果您想直接从 RDD.foreach
访问 Neo4j,您必须使用标准客户端(AnormCypher 似乎提供了非常优雅的 API),而无需转换为 Spark 分布式结构。
一点不相关的笔记 - 你 might consider using a single connection for the set of records with foreachPartition
(also