AuthenticationException Neo4j Spark 流

AuthenticationException Neo4j Spark streaming

我正在使用 Kafka Producer 和 Spark Consumer。我想将主题中的一些数据作为数组传递给消费者,并使用此数据作为参数执行 Neo4j 查询。现在,我想用一组数据测试这个查询。 问题是,当我尝试 运行 我的消费者时,我得到一个例外:

org.neo4j.driver.v1.exceptions.AuthenticationException: Unsupported authentication token, scheme 'none' is only allowed when auth is disabled.

这是我使用 Spark 和 Neo4j 配置的主要方法:

def main(args: Array[String]) {

    val sparkSession = SparkSession
      .builder()
      .appName("KafkaSparkStreaming")
      .master("local[*]")
      .getOrCreate()

     val sparkConf = sparkSession.conf

    val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
    streamingContext.sparkContext.setLogLevel("ERROR")


 val neo4jLocalConfig = ConfigFactory.parseFile(new File("configs/local_neo4j.conf"))
      sparkConf.set("spark.neo4j.bolt.url", neo4jLocalConfig.getString("neo4j.url"))
      sparkConf.set("spark.neo4j.bolt.user", neo4jLocalConfig.getString("neo4j.user"))
      sparkConf.set("spark.neo4j.bolt.password", neo4jLocalConfig.getString("neo4j.password"))

val arr = Array("18731", "41.84000015258789", "-87.62999725341797")
    execNeo4jSearchQuery(arr, sparkSession.sparkContext)

    streamingContext.start()
    streamingContext.awaitTermination()

}

这是我 运行 我的查询的方法:

def execNeo4jSearchQuery(data: Array[String], sc: SparkContext) = {
      println("Id: " + data(0) + ", Lat: " + data(1) + ", Lon: " + data(2))
      val neo = Neo4j(sc)
      val sqlContext = new SQLContext(sc)
      val query = "MATCH (m:Member)-[mtg_r:MT_TO_MEMBER]->(mt:MemberTopics)-[mtt_r:MT_TO_TOPIC]->(t:Topic), (t1:Topic)-[tt_r:GT_TO_TOPIC]->(gt:GroupTopics)-[tg_r:GT_TO_GROUP]->(g:Group)-[h_r:HAS]->(e:Event)-[a_r:AT]->(v:Venue) WHERE mt.topic_id = gt.topic_id AND distance(point({ longitude: {lon}, latitude: {lat}}),point({ longitude: v.lon, latitude: v.lat })) < 4000 AND mt.member_id = {id} RETURN g.group_name, e.event_name, v.venue_name"

    val df = neo.cypher(query).params(Map("lat" -> data(1).toDouble, "lon" -> data(2).toDouble, "id" -> data(0).toInt))
      .partitions(4).batch(25)
      .loadDataFrame
}

我检查了它在 Neo4j 中工作正常的查询。那么什么可能导致这个异常呢?

我研究并尝试了各种帮助我找到答案的选项。据我了解,发生此异常是因为我没有正确设置 Neo4j 的 SparkConfig 参数。解决方案是提供 SparkConfig 作为 SparkSession 属性之一。 SparkConfig 应该已经设置了所有 Neo4j 属性