尝试在 Spark 中使用 Jena elephas 的 TriplesInputFormat 读取 RDF 文件时出现 NullPointerException

NullPointerException when trying to read an RDF file using Jena elephas's TriplesInputFormat in Spark

我尝试使用 Apache Jena Elephas 将 RDF 文件加载到 Spark RDD 中。 RDF 文件是 Turtle 格式。代码如下

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.jena.hadoop.rdf.io.input.TriplesInputFormat
import org.apache.hadoop.io.LongWritable
import org.apache.jena.hadoop.rdf.types.TripleWritable
import org.apache.hadoop.conf.Configuration

object RDFTest {

  def main(args: Array[String]) = {

    val conf = new SparkConf()
      .setAppName("RDFTest")
      .setMaster("local")

    val sc = new SparkContext(conf)

    val hadoopConf = new Configuration()

    val rdfTriples = sc.newAPIHadoopFile(
        "file:///media/xinwang/Elements/data/chembl/20.0-uncompressed/cco.ttl", 
        classOf[TriplesInputFormat],
        classOf[LongWritable],
        classOf[TripleWritable],
        hadoopConf)

    rdfTriples.take(10).foreach(println)

  }

}

但是,当我 运行 程序时,它抛出了 NullPointerException。 运行程序的日志信息如下

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/11/21 11:58:33 INFO SparkContext: Running Spark version 1.5.2
15/11/21 11:58:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/11/21 11:58:33 WARN Utils: Your hostname, x1 resolves to a loopback address: 127.0.1.1; using 192.168.0.7 instead (on interface wlan0)
15/11/21 11:58:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/11/21 11:58:33 INFO SecurityManager: Changing view acls to: xinwang
15/11/21 11:58:33 INFO SecurityManager: Changing modify acls to: xinwang
15/11/21 11:58:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xinwang); users with modify permissions: Set(xinwang)
15/11/21 11:58:34 INFO Slf4jLogger: Slf4jLogger started
15/11/21 11:58:34 INFO Remoting: Starting remoting
15/11/21 11:58:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.0.7:45252]
15/11/21 11:58:34 INFO Utils: Successfully started service 'sparkDriver' on port 45252.
15/11/21 11:58:34 INFO SparkEnv: Registering MapOutputTracker
15/11/21 11:58:34 INFO SparkEnv: Registering BlockManagerMaster
15/11/21 11:58:34 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-1bc925bc-9bc4-45d2-a67a-00b6d9790d85
15/11/21 11:58:34 INFO MemoryStore: MemoryStore started with capacity 919.9 MB
15/11/21 11:58:34 INFO HttpFileServer: HTTP File server directory is /tmp/spark-3a0e0453-af83-4fab-bf33-457d6e5932c7/httpd-047c004d-435f-474f-906c-78ee07e2ae2d
15/11/21 11:58:34 INFO HttpServer: Starting HTTP Server
15/11/21 11:58:34 INFO Utils: Successfully started service 'HTTP file server' on port 39793.
15/11/21 11:58:34 INFO SparkEnv: Registering OutputCommitCoordinator
15/11/21 11:58:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
15/11/21 11:58:34 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
15/11/21 11:58:34 INFO Utils: Successfully started service 'SparkUI' on port 4042.
15/11/21 11:58:34 INFO SparkUI: Started SparkUI at http://192.168.0.7:4042
15/11/21 11:58:34 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
15/11/21 11:58:34 INFO Executor: Starting executor ID driver on host localhost
15/11/21 11:58:35 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45670.
15/11/21 11:58:35 INFO NettyBlockTransferService: Server created on 45670
15/11/21 11:58:35 INFO BlockManagerMaster: Trying to register BlockManager
15/11/21 11:58:35 INFO BlockManagerMasterEndpoint: Registering block manager localhost:45670 with 919.9 MB RAM, BlockManagerId(driver, localhost, 45670)
15/11/21 11:58:35 INFO BlockManagerMaster: Registered BlockManager
15/11/21 11:58:35 INFO MemoryStore: ensureFreeSpace(130896) called with curMem=0, maxMem=964574576
15/11/21 11:58:35 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.8 KB, free 919.8 MB)
15/11/21 11:58:35 INFO MemoryStore: ensureFreeSpace(14349) called with curMem=130896, maxMem=964574576
15/11/21 11:58:35 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 14.0 KB, free 919.8 MB)
15/11/21 11:58:35 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:45670 (size: 14.0 KB, free: 919.9 MB)
15/11/21 11:58:35 INFO SparkContext: Created broadcast 0 from newAPIHadoopFile at RDFTest.scala:22
15/11/21 11:58:35 INFO FileInputFormat: Total input paths to process : 1
15/11/21 11:58:35 INFO SparkContext: Starting job: take at RDFTest.scala:29
15/11/21 11:58:36 INFO DAGScheduler: Got job 0 (take at RDFTest.scala:29) with 1 output partitions
15/11/21 11:58:36 INFO DAGScheduler: Final stage: ResultStage 0(take at RDFTest.scala:29)
15/11/21 11:58:36 INFO DAGScheduler: Parents of final stage: List()
15/11/21 11:58:36 INFO DAGScheduler: Missing parents: List()
15/11/21 11:58:36 INFO DAGScheduler: Submitting ResultStage 0 (file:///media/xinwang/Elements/data/chembl/20.0-uncompressed/cco.ttl NewHadoopRDD[0] at newAPIHadoopFile at RDFTest.scala:22), which has no missing parents
15/11/21 11:58:36 INFO MemoryStore: ensureFreeSpace(1880) called with curMem=145245, maxMem=964574576
15/11/21 11:58:36 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 1880.0 B, free 919.7 MB)
15/11/21 11:58:36 INFO MemoryStore: ensureFreeSpace(1150) called with curMem=147125, maxMem=964574576
15/11/21 11:58:36 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1150.0 B, free 919.7 MB)
15/11/21 11:58:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:45670 (size: 1150.0 B, free: 919.9 MB)
15/11/21 11:58:36 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
15/11/21 11:58:36 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (file:///media/xinwang/Elements/data/chembl/20.0-uncompressed/cco.ttl NewHadoopRDD[0] at newAPIHadoopFile at RDFTest.scala:22)
15/11/21 11:58:36 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/11/21 11:58:36 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 2207 bytes)
15/11/21 11:58:36 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/11/21 11:58:36 INFO NewHadoopRDD: Input split: file:/media/xinwang/Elements/data/chembl/20.0-uncompressed/cco.ttl:0+61178
15/11/21 11:58:36 WARN AbstractLineBasedNodeTupleReader: Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown.  Consider setting rdf.io.input.ignore-bad-tuples to false to disable this behaviour
15/11/21 11:58:36 INFO AbstractLineBasedNodeTupleReader: Got split with start 0 and length 61178 for file with total length of 61178
15/11/21 11:58:36 ERROR AbstractLineBasedNodeTupleReader: Error parsing whole file, aborting further parsing
java.lang.NullPointerException
    at org.apache.jena.riot.lang.LangTurtleBase.directivePrefix(LangTurtleBase.java:162)
    at org.apache.jena.riot.lang.LangTurtleBase.directive(LangTurtleBase.java:139)
    at org.apache.jena.riot.lang.LangTurtleBase.runParser(LangTurtleBase.java:78)
    at org.apache.jena.riot.lang.LangBase.parse(LangBase.java:42)
    at org.apache.jena.riot.RDFParserRegistry$ReaderRIOTLang.read(RDFParserRegistry.java:175)
    at org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileNodeTupleReader.run(AbstractWholeFileNodeTupleReader.java:185)
    at java.lang.Thread.run(Thread.java:745)
15/11/21 11:58:36 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2040 bytes result sent to driver
15/11/21 11:58:36 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 594 ms on localhost (1/1)
15/11/21 11:58:36 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/11/21 11:58:36 INFO DAGScheduler: ResultStage 0 (take at RDFTest.scala:29) finished in 0.605 s
15/11/21 11:58:36 INFO DAGScheduler: Job 0 finished: take at RDFTest.scala:29, took 0.660084 s
15/11/21 11:58:36 INFO SparkContext: Invoking stop() from shutdown hook
15/11/21 11:58:36 INFO SparkUI: Stopped Spark web UI at http://192.168.0.7:4042
15/11/21 11:58:36 INFO DAGScheduler: Stopping DAGScheduler
15/11/21 11:58:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/11/21 11:58:36 INFO MemoryStore: MemoryStore cleared
15/11/21 11:58:36 INFO BlockManager: BlockManager stopped
15/11/21 11:58:36 INFO BlockManagerMaster: BlockManagerMaster stopped
15/11/21 11:58:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/11/21 11:58:36 INFO SparkContext: Successfully stopped SparkContext
15/11/21 11:58:36 INFO ShutdownHookManager: Shutdown hook called
15/11/21 11:58:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-3a0e0453-af83-4fab-bf33-457d6e5932c7

当我尝试调试时,它显示在 org.apache.jena.riot.lang.LangTurtleBase.directivePrefix(LangTurtleBase.java:162) 调用 getPrefixMap() 方法返回了一个空指针。

Screenshot of the location where the NullPointerException is thrown

然后我在Jena源代码中发现在初始化阶段在RdfIOUtils.createParserProfile(JobContext, Path) line: 60创建Prologue对象时,它将null作为构造函数的第一个参数传递。

null passed to the Prologue class constructor

而在 Prologue 的构造函数中,值为 null 的参数 pmap 刚刚分配给 this.prefixMap.

org.apache.jena.riot.Prologue.java line: 54

public Prologue(PrefixMap pmap, IRIResolver resolver)
{
    this.prefixMap = pmap ; 
    this.resolver = resolver ;
}

我想这可能是getPrefixMap()方法返回空指针的原因。

但我只是想知道如何解决这个问题。

非常感谢您的帮助。

您可以尝试用包 org.apache.jena.hadoop.rdf.io.input.turtle 中的 Turtle InputFormat 替换 Triple Input Format。希望这有帮助。

这是 Elephas 中的一个错误,已被归档为 JENA-1075,现已修复

该错误仅影响 Turtle 输入,因此您可以通过将输入数据转换为 Turtle 以外的格式来避免这种情况。

或者,您可以将您的 Maven 构建指向使用最新的开发快照(3.0.1-SNAPSHOT 在撰写本文时),这将下载一个包含修复程序的版本