将 RDD 转换为 Map 列表

Transforming RDD to List of Map

我需要将每个 RDD 转换为 NavigableMap 并存储在匿名函数中的 List<NavigableMap<byte[], List<Map<String, String>>>> 中。我正在 JavaPairDStream<ImmutableBytesWritable, Put> puts 上工作。

我目前拥有的:

puts.foreachRDD(r ->
    List<NavigableMap<byte[], List<Cell>>> l = r.map(t ->
        t._2().getFamilyCellMap()).collect();
    return null;
});

这会在 .collect() 处抛出 NotSerializableException,因为 Cell 不可序列化。

所以我需要以某种方式将 Cell 转换为匿名函数中的 Map<String, String 和 return List<NavigableMap<byte[], List<Map<String, String>>>> 以便我使用 .collect().

没有多少 Java 8 经验,我几乎被困在这一点上。任何帮助将不胜感激。

这就是我最终的处理方式。在 collecting 之前我需要 primitives。我试图收集 .getFamilyCellMap() 的 return 值,它是 map 而不是 serializable

然后我把它变回 assertions 之前的 map

这里是 Java 代码:

puts.foreachRDD(r -> {
        List<String> l = r.flatMap(t -> {
                    Collection<List<Cell>> collection = t._2().getFamilyCellMap().values();
            return collection.stream()
                    .flatMap(Collection::stream)
                    .map(CellUtil::cloneValue))
                    .collect(Collectors.toList());
                }).collect();

        //Mapping for testing
        ObjectMapper objectMapper = new ObjectMapper();
        Map<String, Object> map1 = objectMapper.readValue(l.get(1), new TypeReference<Map<String, Object>>(){});
        Map<String, Object> map2 = objectMapper.readValue(l.get(2), new TypeReference<Map<String, Object>>(){});

        System.out.println(map1);
        Assert.assertEquals(map1.get("attribute1").toString(), expected1);
        //etc
        System.out.println(map2);
        Assert.assertEquals(map2.get("attribute2").toString(), expected2);
        //etc
    }
    return null;
});

希望这对有需要的人有所帮助。