Arraylist 在 JavaRDD<String>.foreach 之后为空

Arraylist is empty after JavaRDD<String>.foreach in spark

样本json(共100条记录):

{"name":"dev","salary":10000,"occupation":"engg","address":"noida"} {"name":"karthik","salary":20000,"occupation":"engg","address":"noida"}

有用代码:

   final List<Map<String,String>> jsonData = new ArrayList<>();

   DataFrame df =  sqlContext.read().json("file:///home/dev/data-json/emp.json");
   JavaRDD<String> rdd = df.repartition(1).toJSON().toJavaRDD(); 

   rdd.foreach(new VoidFunction<String>() {
       @Override
       public void call(String line)  {
           try {
               jsonData.add (new ObjectMapper().readValue(line, Map.class));
               System.out.println(Thread.currentThread().getName());
               System.out.println("List size: "+jsonData.size());
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   });

   System.out.println(Thread.currentThread().getName());
   System.out.println("List size: "+jsonData.size());

jsonData最后是空的。

输出:

Executor task launch worker-1
List size: 1
Executor task launch worker-1
List size: 2
Executor task launch worker-1
List size: 3
.
.
.
Executor task launch worker-1
List size: 100

main
List size: 0

我已经测试过并且有效 https://github.com/freedev/spark-test

final ObjectMapper objectMapper = new ObjectMapper();

List<Map<String, Object>> list = rdd
        .map(new org.apache.spark.api.java.function.Function<String, Map<String, Object>>() {
            @Override
            public Map<String, Object> call(String line) throws Exception {
                TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>() {
                };
                Map<String, Object> rs = objectMapper.readValue(line, typeRef);
                return rs;
            }
        }).collect();

我更喜欢映射 Map<String, Object>,因为这将处理不是 Json 值部分不是字符串的情况(即 "salary":20000)。