BroadCast 变量在 Spark 程序中发布
BroadCast Variable publish in Spark Program
在 spark - java 程序中,我需要读取一个配置文件并填充一个 HashMap ,我需要将其发布为广播变量,以便它可以在所有数据节点上使用。
我需要在 CustomInputFormat class 中获取此广播变量的值,它将发送到数据节点中的 运行。我如何在我的 CustomInputFormat class 中指定以从特定广播变量获取值,因为广播变量是在我的驱动程序中声明的?
我正在添加一些代码来详细解释它:
在这种情况下1,我在驱动程序本身中使用它,即变量在同一个 class 中使用:在这里我可以使用 Broadcat.value() 方法
> final Broadcast<String[]> signPrefixes =
> sc.broadcast(loadCallSignTable());
> JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
> new PairFunction<Tuple2<String, Integer>, String, Integer> (){
> public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
> String sign = callSignCount._1();
> String country = lookupCountry(sign, signPrefixes.value());
> return new Tuple2(country, callSignCount._2());
> }}).reduceByKey(new SumInts());
在场景 2 中,我将在我的自定义输入格式中使用广播变量 class :
驱动程序:
> final JavaSparkContext sc= new
> JavaSparkContext(sConf.setAppName("ParserSpark").setMaster("yarn-cluster"));
> Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
>
> JavaPairRDD<NullWritable, ArrayList<Record>> baseRDD =
> sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class,
> ArrayList.class, conf);
InputFormat.class
> public class InputFormat extends FileInputFormat {
>
> @Override public RecordReader<NullWritable, ArrayList<Record>>
> createRecordReader(InputSplit split, TaskAttemptContext context)
> throws IOException, InterruptedException{
> //I want to get the Broadcast Variable Here -- How will I do it
>
> RecordReader reader = new RecordReader(); reader.initialize(split, context); return reader; } @Override
> protected boolean isSplitable(JobContext context, Path file) {
> return false; } }
您可以在 val bcVariable = sc.broadcast(myVariableToBroadcast)
的驱动程序上创建广播 var,稍后使用 bcVariable.value
访问它
我最近 运行 对此感兴趣。结束其实很简单(几个小时后然后...哈!)
创建一个新的配置,设置您的变量,并将其传递给 newAPIHadoopFile 函数的一个稍微不同的实现。
来自驱动程序(这里使用Scala):
val myConf = new Configuration();
myConf.set("var1", v1)
myConf.set("var2", v2)
myConf.set("var3", v3)
val yourFile = sc.newAPIHadoopFile("yourFilePath", classOf[MyFileInputFormat],classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.DoubleWritable],myConf)
从您的 InputFormat 或 InputReader.. 或者您有上下文的任何地方(Java 这次)
context.getConfiguration().get("var1");
或者
job.getConfiguration().get("var2");
在 spark - java 程序中,我需要读取一个配置文件并填充一个 HashMap ,我需要将其发布为广播变量,以便它可以在所有数据节点上使用。
我需要在 CustomInputFormat class 中获取此广播变量的值,它将发送到数据节点中的 运行。我如何在我的 CustomInputFormat class 中指定以从特定广播变量获取值,因为广播变量是在我的驱动程序中声明的?
我正在添加一些代码来详细解释它:
在这种情况下1,我在驱动程序本身中使用它,即变量在同一个 class 中使用:在这里我可以使用 Broadcat.value() 方法
> final Broadcast<String[]> signPrefixes =
> sc.broadcast(loadCallSignTable());
> JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
> new PairFunction<Tuple2<String, Integer>, String, Integer> (){
> public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
> String sign = callSignCount._1();
> String country = lookupCountry(sign, signPrefixes.value());
> return new Tuple2(country, callSignCount._2());
> }}).reduceByKey(new SumInts());
在场景 2 中,我将在我的自定义输入格式中使用广播变量 class :
驱动程序:
> final JavaSparkContext sc= new
> JavaSparkContext(sConf.setAppName("ParserSpark").setMaster("yarn-cluster"));
> Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
>
> JavaPairRDD<NullWritable, ArrayList<Record>> baseRDD =
> sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class,
> ArrayList.class, conf);
InputFormat.class
> public class InputFormat extends FileInputFormat {
>
> @Override public RecordReader<NullWritable, ArrayList<Record>>
> createRecordReader(InputSplit split, TaskAttemptContext context)
> throws IOException, InterruptedException{
> //I want to get the Broadcast Variable Here -- How will I do it
>
> RecordReader reader = new RecordReader(); reader.initialize(split, context); return reader; } @Override
> protected boolean isSplitable(JobContext context, Path file) {
> return false; } }
您可以在 val bcVariable = sc.broadcast(myVariableToBroadcast)
的驱动程序上创建广播 var,稍后使用 bcVariable.value
我最近 运行 对此感兴趣。结束其实很简单(几个小时后然后...哈!)
创建一个新的配置,设置您的变量,并将其传递给 newAPIHadoopFile 函数的一个稍微不同的实现。
来自驱动程序(这里使用Scala):
val myConf = new Configuration();
myConf.set("var1", v1)
myConf.set("var2", v2)
myConf.set("var3", v3)
val yourFile = sc.newAPIHadoopFile("yourFilePath", classOf[MyFileInputFormat],classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.DoubleWritable],myConf)
从您的 InputFormat 或 InputReader.. 或者您有上下文的任何地方(Java 这次)
context.getConfiguration().get("var1");
或者
job.getConfiguration().get("var2");