从猪写入 mongodb 时出错

error writing to mongodb from pig

我正在尝试将 mongo hadoop 连接器与 pig 一起使用或流式传输到来自 mongodb 的 load/store 数据。使用猪我有以下问题:

$猫process.pig

REGISTER /usr/hdp/2.2.4.2-2/hadoop/lib/mongo-java-driver-3.0.2.jar
REGISTER /usr/hdp/2.2.4.2-2/hadoop/lib/mongo-hadoop-core-1.4.0.jar
REGISTER /usr/hdp/2.2.4.2-2/hadoop/lib/mongo-hadoop-pig-1.4.0.jar

SET mapreduce.map.speculative false
SET mapreduce.reduce.speculative false
SET mapreduce.fileoutputcommitter.marksuccessfuljobs false
SET mongo.auth.uri 'mongodb://hadoop:password@127.0.0.1:27017/admin'

raw = LOAD 'mongodb://hadoop:password@127.0.0.1:27017/hadoop.collection'
USING com.mongodb.hadoop.pig.MongoLoader('id:chararray, t:chararray, c_s:map[]');

将数据写入bson文件
STORE raw
INTO 'file:///tmp/pig_without_limit_bson'
USING com.mongodb.hadoop.pig.BSONStorage('id');

有效,我可以使用 mongorestore 导入文件。

使用

写入 mongodb
STORE raw
INTO 'mongodb://hadoop:password@127.0.0.1:27017/hadoop.out' 
USING com.mongodb.hadoop.pig.MongoInsertStorage('id:chararray, t:chararray', 'id');

不起作用并产生以下错误:

Input(s):
Failed to read data from     "mongodb://hadoop:password@127.0.0.1:27017/hadoop.collection"
Output(s):
Failed to produce result in "mongodb://hadoop:password@127.0.0.1:27017/hadoop.out"

$猫pig.log

Error: java.lang.IllegalStateException: state should be: open
    at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70)
    at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:79)
    at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75)
    at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71)
    at com.mongodb.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:68)
    at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:175)
    at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:141)
    at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:72)
    at com.mongodb.Mongo.execute(Mongo.java:745)
    at com.mongodb.Mongo.execute(Mongo.java:728)
    at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1968)
    at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1962)
    at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:98)
    at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:133)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.commitTask(PigOutputCommitter.java:356)
    at org.apache.hadoop.mapred.Task.commit(Task.java:1163)
    at org.apache.hadoop.mapred.Task.done(Task.java:1025)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:345)
    at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

Pig Stack Trace
---------------
ERROR 0: java.io.IOException: No FileSystem for scheme: mongodb

org.apache.pig.backend.executionengine.ExecException: ERROR 0: java.io.IOException: No FileSystem for scheme: mongodb
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher.launchPig(MapReduceLauncher.java:535)
    at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.launchPig(HExecutionEngine.java:280)
    at org.apache.pig.PigServer.launchPlan(PigServer.java:1390)
    at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1375)
    at org.apache.pig.PigServer.execute(PigServer.java:1364)
    at org.apache.pig.PigServer.executeBatch(PigServer.java:415)
    at org.apache.pig.PigServer.executeBatch(PigServer.java:398)
    at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:171)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:234)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
    at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
    at org.apache.pig.Main.run(Main.java:495)
    at org.apache.pig.Main.main(Main.java:170)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.io.IOException: No FileSystem for scheme: mongodb
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2653)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2635)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.pig.StoreFunc.cleanupOnFailureImpl(StoreFunc.java:193)
    at org.apache.pig.StoreFunc.cleanupOnFailure(StoreFunc.java:161)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher.launchPig(MapReduceLauncher.java:526)
... 18 more

但是,如果使用限制运算符(即使限制为巨大的数字),所有文档都会保存到 mongodb。

raw_limited = limit raw 1000000;
STORE raw_limited
INTO 'mongodb://hadoop:password@127.0.0.1:27017/hadoop.out' 
USING com.mongodb.hadoop.pig.MongoInsertStorage('id:chararray, t:chararray', 'id');

结果

Input(s):
Successfully read 100 records (638 bytes) from:
Output(s):
Successfully stored 100 records (18477 bytes) in:

$mongo hadoop

 >> db.out.count()
100

为什么会这样,如何解决?我错过了什么吗?

似乎是 mongo java 驱动程序中的错误。 如果使用 mongo java 驱动程序的 3.0.4 版,它可以工作。