将 RDD 转换为 Map 列表
Transforming RDD to List of Map
我需要将每个 RDD 转换为 NavigableMap 并存储在匿名函数中的 List<NavigableMap<byte[], List<Map<String, String>>>>
中。我正在 JavaPairDStream<ImmutableBytesWritable, Put> puts
上工作。
我目前拥有的:
puts.foreachRDD(r ->
List<NavigableMap<byte[], List<Cell>>> l = r.map(t ->
t._2().getFamilyCellMap()).collect();
return null;
});
这会在 .collect()
处抛出 NotSerializableException,因为 Cell
不可序列化。
所以我需要以某种方式将 Cell
转换为匿名函数中的 Map<String, String
和 return List<NavigableMap<byte[], List<Map<String, String>>>>
以便我使用 .collect()
.
没有多少 Java 8 经验,我几乎被困在这一点上。任何帮助将不胜感激。
这就是我最终的处理方式。在 collecting
之前我需要 primitives
。我试图收集 .getFamilyCellMap()
的 return 值,它是 map
而不是 serializable
。
然后我把它变回 assertions
之前的 map
。
这里是 Java
代码:
puts.foreachRDD(r -> {
List<String> l = r.flatMap(t -> {
Collection<List<Cell>> collection = t._2().getFamilyCellMap().values();
return collection.stream()
.flatMap(Collection::stream)
.map(CellUtil::cloneValue))
.collect(Collectors.toList());
}).collect();
//Mapping for testing
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map1 = objectMapper.readValue(l.get(1), new TypeReference<Map<String, Object>>(){});
Map<String, Object> map2 = objectMapper.readValue(l.get(2), new TypeReference<Map<String, Object>>(){});
System.out.println(map1);
Assert.assertEquals(map1.get("attribute1").toString(), expected1);
//etc
System.out.println(map2);
Assert.assertEquals(map2.get("attribute2").toString(), expected2);
//etc
}
return null;
});
希望这对有需要的人有所帮助。
我需要将每个 RDD 转换为 NavigableMap 并存储在匿名函数中的 List<NavigableMap<byte[], List<Map<String, String>>>>
中。我正在 JavaPairDStream<ImmutableBytesWritable, Put> puts
上工作。
我目前拥有的:
puts.foreachRDD(r ->
List<NavigableMap<byte[], List<Cell>>> l = r.map(t ->
t._2().getFamilyCellMap()).collect();
return null;
});
这会在 .collect()
处抛出 NotSerializableException,因为 Cell
不可序列化。
所以我需要以某种方式将 Cell
转换为匿名函数中的 Map<String, String
和 return List<NavigableMap<byte[], List<Map<String, String>>>>
以便我使用 .collect()
.
没有多少 Java 8 经验,我几乎被困在这一点上。任何帮助将不胜感激。
这就是我最终的处理方式。在 collecting
之前我需要 primitives
。我试图收集 .getFamilyCellMap()
的 return 值,它是 map
而不是 serializable
。
然后我把它变回 assertions
之前的 map
。
这里是 Java
代码:
puts.foreachRDD(r -> {
List<String> l = r.flatMap(t -> {
Collection<List<Cell>> collection = t._2().getFamilyCellMap().values();
return collection.stream()
.flatMap(Collection::stream)
.map(CellUtil::cloneValue))
.collect(Collectors.toList());
}).collect();
//Mapping for testing
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map1 = objectMapper.readValue(l.get(1), new TypeReference<Map<String, Object>>(){});
Map<String, Object> map2 = objectMapper.readValue(l.get(2), new TypeReference<Map<String, Object>>(){});
System.out.println(map1);
Assert.assertEquals(map1.get("attribute1").toString(), expected1);
//etc
System.out.println(map2);
Assert.assertEquals(map2.get("attribute2").toString(), expected2);
//etc
}
return null;
});
希望这对有需要的人有所帮助。