使用 Py4J 调用采用 JavaSparkContext 和 return JavaRDD<Integer> 的方法
Using Py4J to invoke a method that takes a JavaSparkContext and return a JavaRDD<Integer>
我正在寻找一些帮助或示例代码来说明 pyspark 调用用户在 spark 本身之外编写的 Java 代码,该代码从 Python 获取 spark 上下文,然后 returns 一个 RDD内置 Java。
为了完整起见,我使用的是 Py4J 0.81、Java 8、Python 2.7 和 spark 1.3.1
这是我用于 Python 一半的内容:
import pyspark
sc = pyspark.SparkContext(master='local[4]',
appName='HelloWorld')
print "version", sc._jsc.version()
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()
print gateway.entry_point.getRDDFromSC(sc._jsc)
Java部分是:
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import py4j.GatewayServer;
public class HelloWorld
{
public JavaRDD<Integer> getRDDFromSC(JavaSparkContext jsc)
{
JavaRDD<Integer> result = null;
if (jsc == null)
{
System.out.println("XXX Bad mojo XXX");
return result;
}
int n = 10;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++)
{
l.add(i);
}
result = jsc.parallelize(l);
return result;
}
public static void main(String[] args)
{
HelloWorld app = new HelloWorld();
GatewayServer server = new GatewayServer(app);
server.start();
}
}
运行 在 Python 侧产生:
$ spark-1.3.1-bin-hadoop1/bin/spark-submit main.py
version 1.3.1
sc._jsc <class 'py4j.java_gateway.JavaObject'>
org.apache.spark.api.java.JavaSparkContext@50418105
None
Java方报道:
$ spark-1.3.1-bin-hadoop1/bin/spark-submit --class "HelloWorld" --master local[4] target/hello-world-1.0.jar
XXX Bad mojo XXX
问题似乎是我没有正确地将 JavaSparkContext
从 Python 传递到 Java。当我使用 from python sc._scj.sc()
.
时,也会发生 JavaRDD
为 null
的相同故障
调用使用来自 Python 的 spark 的用户定义 Java 代码的正确方法是什么?
所以我在为 Sparkling Pandas The branch lives at https://github.com/holdenk/sparklingpandas/tree/add-kurtosis-support and the PR is at https://github.com/sparklingpandas/sparklingpandas/pull/90 工作的分支中有一个这样的例子。
目前看来您有两个不同的网关服务器,这似乎可能会导致一些问题,您可以只使用现有的网关服务器并执行类似以下操作:
sc._jvm.what.ever.your.class.package.is.HelloWorld.getRDDFromSC(sc._jsc)
假设您也将其设为静态方法。
我正在寻找一些帮助或示例代码来说明 pyspark 调用用户在 spark 本身之外编写的 Java 代码,该代码从 Python 获取 spark 上下文,然后 returns 一个 RDD内置 Java。
为了完整起见,我使用的是 Py4J 0.81、Java 8、Python 2.7 和 spark 1.3.1
这是我用于 Python 一半的内容:
import pyspark
sc = pyspark.SparkContext(master='local[4]',
appName='HelloWorld')
print "version", sc._jsc.version()
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()
print gateway.entry_point.getRDDFromSC(sc._jsc)
Java部分是:
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import py4j.GatewayServer;
public class HelloWorld
{
public JavaRDD<Integer> getRDDFromSC(JavaSparkContext jsc)
{
JavaRDD<Integer> result = null;
if (jsc == null)
{
System.out.println("XXX Bad mojo XXX");
return result;
}
int n = 10;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++)
{
l.add(i);
}
result = jsc.parallelize(l);
return result;
}
public static void main(String[] args)
{
HelloWorld app = new HelloWorld();
GatewayServer server = new GatewayServer(app);
server.start();
}
}
运行 在 Python 侧产生:
$ spark-1.3.1-bin-hadoop1/bin/spark-submit main.py
version 1.3.1
sc._jsc <class 'py4j.java_gateway.JavaObject'>
org.apache.spark.api.java.JavaSparkContext@50418105
None
Java方报道:
$ spark-1.3.1-bin-hadoop1/bin/spark-submit --class "HelloWorld" --master local[4] target/hello-world-1.0.jar
XXX Bad mojo XXX
问题似乎是我没有正确地将 JavaSparkContext
从 Python 传递到 Java。当我使用 from python sc._scj.sc()
.
JavaRDD
为 null
的相同故障
调用使用来自 Python 的 spark 的用户定义 Java 代码的正确方法是什么?
所以我在为 Sparkling Pandas The branch lives at https://github.com/holdenk/sparklingpandas/tree/add-kurtosis-support and the PR is at https://github.com/sparklingpandas/sparklingpandas/pull/90 工作的分支中有一个这样的例子。
目前看来您有两个不同的网关服务器,这似乎可能会导致一些问题,您可以只使用现有的网关服务器并执行类似以下操作:
sc._jvm.what.ever.your.class.package.is.HelloWorld.getRDDFromSC(sc._jsc)
假设您也将其设为静态方法。