如何使用 MongoSpark 和 JavaRdd 在 java 中执行 MapReduce

How to do a MapReduce in java with MongoSpark and JavaRdd

我正在尝试在 java 中使用 MongoSpark 和 rdd (JavaMongoRdd) 进行 mapReduce。所以目前,我能够在我的 Rdd 中检索我的 mongo 文档,但我不知道之后如何进行。事实上,我的文档中有一个日期字段,我想使用该日期中的年份来执行我的 mapReduce,但我没有找到任何关于如何执行此操作的信息。所以我在这里问你是否有一些文档、教程甚至是如何进行的示例。

这里是代码,我试图用 Mongo 文件和年份来配对 Rdd,以计算每年的文件数量,但我不知道这是否是我的方式继续

 public String count() {
    JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());
    JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
    logger.info("test 1 :" + rdd.count());
    logger.info("test 2 :" + rdd.first().toJson());

    /*JavaMongoRDD<Document> newRdd = rdd.withPipeline(
            Collections.singletonList(
                    Document.parse("{ $match: { _id : { $gt : ObjectId(\"5c9e180cdba48525f0df30b9\") } } }")
            )
    );*/

    //logger.info("test 2.5 :" +newRdd.first());

    JavaPairRDD<String, Document> pairRdd = rdd
            .mapToPair((document) -> new Tuple2(document.getString("date").split(".")[1], document));
    logger.info("test 3 :" + pairRdd.first());
    //logger.info("test 2 :" + rdd.first().toJson());
    //ar
    //logger.info("test spark");
    return "test";
}

我的 MongoDb 文件看起来像这样

        "_id" : ObjectId("5c9e180ddba48525f0df30cb"),
    "title" : "Redevance: une perte de compétitivité pour l’hydraulique suisse",
    "description" : [
            "Le Parlement a bouclé, durant cette session de printemps, la révision de la loi sur les forces hydrauliques. La solution adoptée aboutit au statu quo sur le plan de la redevance hydraulique. Le taux maximal de cette taxe reste ainsi fixé à 110 francs par kilowatt théorique, jusqu'à fin 2024. Les..."
    ],
    "date" : "dimanche, 24. mars 2019"

看起来你想做这样的事情。

JavaPairRDD<String, Long> pairRdd = rdd.mapToPair((document) ->{
   String date = document.getString("date");
   String year = date.split(" ")[date.split(" ").length-1];// get the year
   return new Tuple2(year,1L);  //create pair of year and 1L the count for this row. 
}
JavaPairRDD<String, Long> counts = pairRdd.reduceByKey((a, b) -> a + b);// for all matching keys in the list accumulate the value

计数应该是像 1999->30、2000->24 这样的地图...

你也可以这样过年。

SimpleDateFormat df = new SimpleDateFormat( "EEEE, dd. MMMM yyyy", Locale.FRANCE);
LocalDate d = df.parse(date).toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
System.out.println(d.getYear());