如何提取RDD的数据到Java ArrayList?
How to extract RDD's data to Java ArrayList?
显而易见的想法是添加元素。
ArrayList<String> myvalues = new ArrayList<String>();
myRdd.foreach(new VoidFunction<org.apache.spark.sql.api.java.Row>() {
@Override
public void call(org.apache.spark.sql.api.java.Row row) throws Exception {
myvalues.add(row.getString(0); // Say I need only first element
}
});
这个和其他备选方案一直在抛出 org.apache.spark.SparkException:任务不可序列化。我进一步简化了功能..显然我在做一些不合逻辑的事情:-
LOG.info("Let's see..");
queryRdd.foreach(new VoidFunction<org.apache.spark.sql.api.java.Row>() {
@Override
public void call(org.apache.spark.sql.api.java.Row row) throws Exception {
LOG.info("Value is : "+row.getString(0));
}
});
必须有一个简单的方法。这是供参考的堆栈跟踪:
2015-10-08 10:16:48 INFO UpdateStatementTemplateImpl:141 - Lets see..
2015-10-08 10:16:48 WARN GenericExceptionMapper:20 - Error while executing service
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1476)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:781)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:313)
at org.apache.spark.sql.api.java.JavaSchemaRDD.foreach(JavaSchemaRDD.scala:42)
at com.simility.cassandra.template.DeviceIDTemplateImpl.test(DeviceIDTemplateImpl.java:144)
at com.kumbay.service.admin.BusinessEntityService.testSignal(BusinessEntityService.java:1801)
at com.kumbay.service.admin.BusinessEntityService$$FastClassByCGLIB$7ddd50.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:701)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.transaction.interceptor.TransactionInterceptor.proceedWithInvocation(TransactionInterceptor.java:96)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:260)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:94)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:64)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:634)
我假设 LOG
和 myvalues
生活在包含 class 的空间中。因此整个 class(作为 call
的 "capture" 的一部分将被序列化,这是不可能的。
解决方案
首先,将 LOG 替换为简单的 System.out.println
,看看是否可行。
其次,创建您在调用中使用的成员的副本;
public void call(...) {
Log log = LOG // or
ArrayList<String> inside = myvalues
inside.add(...)
}
第三,永远不要在 foreach
中使用 ArrayList,因为它在不同的节点上 运行 并且每个节点都会看到自己的 ArrayList。所以,你永远不会如你所愿。
相反,请使用 rdd.collect(...)
来收集您的结果!
显而易见的想法是添加元素。
ArrayList<String> myvalues = new ArrayList<String>();
myRdd.foreach(new VoidFunction<org.apache.spark.sql.api.java.Row>() {
@Override
public void call(org.apache.spark.sql.api.java.Row row) throws Exception {
myvalues.add(row.getString(0); // Say I need only first element
}
});
这个和其他备选方案一直在抛出 org.apache.spark.SparkException:任务不可序列化。我进一步简化了功能..显然我在做一些不合逻辑的事情:-
LOG.info("Let's see..");
queryRdd.foreach(new VoidFunction<org.apache.spark.sql.api.java.Row>() {
@Override
public void call(org.apache.spark.sql.api.java.Row row) throws Exception {
LOG.info("Value is : "+row.getString(0));
}
});
必须有一个简单的方法。这是供参考的堆栈跟踪:
2015-10-08 10:16:48 INFO UpdateStatementTemplateImpl:141 - Lets see..
2015-10-08 10:16:48 WARN GenericExceptionMapper:20 - Error while executing service
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1476)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:781)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:313)
at org.apache.spark.sql.api.java.JavaSchemaRDD.foreach(JavaSchemaRDD.scala:42)
at com.simility.cassandra.template.DeviceIDTemplateImpl.test(DeviceIDTemplateImpl.java:144)
at com.kumbay.service.admin.BusinessEntityService.testSignal(BusinessEntityService.java:1801)
at com.kumbay.service.admin.BusinessEntityService$$FastClassByCGLIB$7ddd50.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:701)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.transaction.interceptor.TransactionInterceptor.proceedWithInvocation(TransactionInterceptor.java:96)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:260)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:94)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:64)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:634)
我假设 LOG
和 myvalues
生活在包含 class 的空间中。因此整个 class(作为 call
的 "capture" 的一部分将被序列化,这是不可能的。
解决方案
首先,将 LOG 替换为简单的 System.out.println
,看看是否可行。
其次,创建您在调用中使用的成员的副本;
public void call(...) {
Log log = LOG // or
ArrayList<String> inside = myvalues
inside.add(...)
}
第三,永远不要在 foreach
中使用 ArrayList,因为它在不同的节点上 运行 并且每个节点都会看到自己的 ArrayList。所以,你永远不会如你所愿。
相反,请使用 rdd.collect(...)
来收集您的结果!