AuthenticationException Neo4j Spark 流
AuthenticationException Neo4j Spark streaming
我正在使用 Kafka Producer 和 Spark Consumer。我想将主题中的一些数据作为数组传递给消费者,并使用此数据作为参数执行 Neo4j 查询。现在,我想用一组数据测试这个查询。
问题是,当我尝试 运行 我的消费者时,我得到一个例外:
org.neo4j.driver.v1.exceptions.AuthenticationException: Unsupported authentication token, scheme 'none' is only allowed when auth is disabled.
这是我使用 Spark 和 Neo4j 配置的主要方法:
def main(args: Array[String]) {
val sparkSession = SparkSession
.builder()
.appName("KafkaSparkStreaming")
.master("local[*]")
.getOrCreate()
val sparkConf = sparkSession.conf
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
streamingContext.sparkContext.setLogLevel("ERROR")
val neo4jLocalConfig = ConfigFactory.parseFile(new File("configs/local_neo4j.conf"))
sparkConf.set("spark.neo4j.bolt.url", neo4jLocalConfig.getString("neo4j.url"))
sparkConf.set("spark.neo4j.bolt.user", neo4jLocalConfig.getString("neo4j.user"))
sparkConf.set("spark.neo4j.bolt.password", neo4jLocalConfig.getString("neo4j.password"))
val arr = Array("18731", "41.84000015258789", "-87.62999725341797")
execNeo4jSearchQuery(arr, sparkSession.sparkContext)
streamingContext.start()
streamingContext.awaitTermination()
}
这是我 运行 我的查询的方法:
def execNeo4jSearchQuery(data: Array[String], sc: SparkContext) = {
println("Id: " + data(0) + ", Lat: " + data(1) + ", Lon: " + data(2))
val neo = Neo4j(sc)
val sqlContext = new SQLContext(sc)
val query = "MATCH (m:Member)-[mtg_r:MT_TO_MEMBER]->(mt:MemberTopics)-[mtt_r:MT_TO_TOPIC]->(t:Topic), (t1:Topic)-[tt_r:GT_TO_TOPIC]->(gt:GroupTopics)-[tg_r:GT_TO_GROUP]->(g:Group)-[h_r:HAS]->(e:Event)-[a_r:AT]->(v:Venue) WHERE mt.topic_id = gt.topic_id AND distance(point({ longitude: {lon}, latitude: {lat}}),point({ longitude: v.lon, latitude: v.lat })) < 4000 AND mt.member_id = {id} RETURN g.group_name, e.event_name, v.venue_name"
val df = neo.cypher(query).params(Map("lat" -> data(1).toDouble, "lon" -> data(2).toDouble, "id" -> data(0).toInt))
.partitions(4).batch(25)
.loadDataFrame
}
我检查了它在 Neo4j 中工作正常的查询。那么什么可能导致这个异常呢?
我研究并尝试了各种帮助我找到答案的选项。据我了解,发生此异常是因为我没有正确设置 Neo4j 的 SparkConfig 参数。解决方案是提供 SparkConfig 作为 SparkSession 属性之一。 SparkConfig 应该已经设置了所有 Neo4j 属性
我正在使用 Kafka Producer 和 Spark Consumer。我想将主题中的一些数据作为数组传递给消费者,并使用此数据作为参数执行 Neo4j 查询。现在,我想用一组数据测试这个查询。 问题是,当我尝试 运行 我的消费者时,我得到一个例外:
org.neo4j.driver.v1.exceptions.AuthenticationException: Unsupported authentication token, scheme 'none' is only allowed when auth is disabled.
这是我使用 Spark 和 Neo4j 配置的主要方法:
def main(args: Array[String]) {
val sparkSession = SparkSession
.builder()
.appName("KafkaSparkStreaming")
.master("local[*]")
.getOrCreate()
val sparkConf = sparkSession.conf
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
streamingContext.sparkContext.setLogLevel("ERROR")
val neo4jLocalConfig = ConfigFactory.parseFile(new File("configs/local_neo4j.conf"))
sparkConf.set("spark.neo4j.bolt.url", neo4jLocalConfig.getString("neo4j.url"))
sparkConf.set("spark.neo4j.bolt.user", neo4jLocalConfig.getString("neo4j.user"))
sparkConf.set("spark.neo4j.bolt.password", neo4jLocalConfig.getString("neo4j.password"))
val arr = Array("18731", "41.84000015258789", "-87.62999725341797")
execNeo4jSearchQuery(arr, sparkSession.sparkContext)
streamingContext.start()
streamingContext.awaitTermination()
}
这是我 运行 我的查询的方法:
def execNeo4jSearchQuery(data: Array[String], sc: SparkContext) = {
println("Id: " + data(0) + ", Lat: " + data(1) + ", Lon: " + data(2))
val neo = Neo4j(sc)
val sqlContext = new SQLContext(sc)
val query = "MATCH (m:Member)-[mtg_r:MT_TO_MEMBER]->(mt:MemberTopics)-[mtt_r:MT_TO_TOPIC]->(t:Topic), (t1:Topic)-[tt_r:GT_TO_TOPIC]->(gt:GroupTopics)-[tg_r:GT_TO_GROUP]->(g:Group)-[h_r:HAS]->(e:Event)-[a_r:AT]->(v:Venue) WHERE mt.topic_id = gt.topic_id AND distance(point({ longitude: {lon}, latitude: {lat}}),point({ longitude: v.lon, latitude: v.lat })) < 4000 AND mt.member_id = {id} RETURN g.group_name, e.event_name, v.venue_name"
val df = neo.cypher(query).params(Map("lat" -> data(1).toDouble, "lon" -> data(2).toDouble, "id" -> data(0).toInt))
.partitions(4).batch(25)
.loadDataFrame
}
我检查了它在 Neo4j 中工作正常的查询。那么什么可能导致这个异常呢?
我研究并尝试了各种帮助我找到答案的选项。据我了解,发生此异常是因为我没有正确设置 Neo4j 的 SparkConfig 参数。解决方案是提供 SparkConfig 作为 SparkSession 属性之一。 SparkConfig 应该已经设置了所有 Neo4j 属性