在 RDD methods/closures 中使用 SparkContext hadoop 配置,例如 foreachPartition
Use SparkContext hadoop configuration within RDD methods/closures, like foreachPartition
我正在使用 Spark 读取一堆文件,对它们进行详细说明,然后将它们全部保存为序列文件。我想要的是每个分区有 1 个序列文件,所以我这样做了:
SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
.setMaster("local[2]")
.set("spark.streaming.stopGracefullyOnShutdown", "true");
final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
//JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
if(!imageByteRDD.isEmpty())
imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
@Override
public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
throws Exception {
[°°°SOME STUFF°°°]
SequenceFile.Writer writer = SequenceFile.createWriter(
jsc.hadoopConfiguration(),
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context?
Previously, I created a Configuration for each partition, and it works, but I'm sure there is a much more "sparky way"
有谁知道如何在 RDD 闭包中使用 Hadoop 配置对象?
貌似做不到,所以这是我使用的代码:
final hdfsNameNodePath = "hdfs://quickstart.cloudera:8080";
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
if(!imageByteRDD.isEmpty())
imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
@Override
public void call(Iterator<Tuple2<String, PortableDataStream>> arg0)
throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNodePath);
//the string above should be passed as argument
SequenceFile.Writer writer = SequenceFile.createWriter(
conf,
SequenceFile.Writer.file([***ETCETERA...
这里的问题是 Hadoop 配置没有被标记为 Serializable
,所以 Spark 不会将它们拉入 RDD。它们被标记为 Writable
,因此 Hadoop 的序列化机制可以对它们进行编组和解组,但 Spark 不会直接使用它
两个长期修复选项是
- 添加对在 Spark 中序列化可写对象的支持。也许 SPARK-2421?
- 使 Hadoop 配置可序列化。
- 添加对序列化 Hadoop 配置的明确支持。
您不会对使 Hadoop conf 可序列化提出任何主要反对意见;如果您实现了委托给可写 IO 调用的自定义 ser/deser 方法(并且只遍历所有 key/value 对)。我是作为 Hadoop 提交者这么说的。
Update:这是创建可序列化 class 的代码,它确实编组 Hadoop 配置的内容。用 val ser = new ConfSerDeser(hadoopConf)
创建它;在您的 RDD 中将其称为 ser.get()
.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.hadoop.conf.Configuration
/**
* Class to make Hadoop configurations serializable; uses the
* `Writeable` operations to do this.
* Note: this only serializes the explicitly set values, not any set
* in site/default or other XML resources.
* @param conf
*/
class ConfigSerDeser(var conf: Configuration) extends Serializable {
def this() {
this(new Configuration())
}
def get(): Configuration = conf
private def writeObject (out: java.io.ObjectOutputStream): Unit = {
conf.write(out)
}
private def readObject (in: java.io.ObjectInputStream): Unit = {
conf = new Configuration()
conf.readFields(in)
}
private def readObjectNoData(): Unit = {
conf = new Configuration()
}
}
请注意,对于某些人来说,为所有可写 classes 制作此通用代码会相对简单;您只需要在构造函数中提供一个 class 名称,并在反序列化期间使用它来实例化可写对象。
根据@Steve 的回答,这是一个 java 实现。
import java.io.Serializable;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
public class SerializableHadoopConfiguration implements Serializable {
Configuration conf;
public SerializableHadoopConfiguration(Configuration hadoopConf) {
this.conf = hadoopConf;
if (this.conf == null) {
this.conf = new Configuration();
}
}
public SerializableHadoopConfiguration() {
this.conf = new Configuration();
}
public Configuration get() {
return this.conf;
}
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
this.conf.write(out);
}
private void readObject(java.io.ObjectInputStream in) throws IOException {
this.conf = new Configuration();
this.conf.readFields(in);
}
}
您可以使用 org.apache.spark.SerializableWritable
序列化和反序列化 org.apache.hadoop.conf.Configuration
。
例如:
import org.apache.spark.SerializableWritable
...
val hadoopConf = spark.sparkContext.hadoopConfiguration
// serialize here
val serializedConf = new SerializableWritable(hadoopConf)
// then access the conf by calling .value on serializedConf
rdd.map(someFunction(serializedConf.value))
查看 Spark 内部代码库,应该广播 Hadoop 配置的序列化版本。
val spark = SparkSession.builder.master("local").getOrCreate
val broadcastedHadoopConf = spark.sparkContext.broadcast(new org.apache.spark.util.SerializableConfiguration(spark.sparkContext.hadoopConfiguration))
val dfFiles = spark.read.format("binaryFile").load("/somepath").select("path")
val df = dfFiles.map {row => {
val rawPath = row.getString(0)
val path = new Path(new URI(rawPath.replace(" ", "%20")))
// get hadoop configuration in RDD method
val hadoopConf = broadcastedHadoopConf.value.value
val fs = path.getFileSystem(hadoopConfiguration)
val status = fs.getFileStatus(path)
val inputStream = fs.open(status.getPath)
// ... whatever you need to do to read data
}}
我正在使用 Spark 读取一堆文件,对它们进行详细说明,然后将它们全部保存为序列文件。我想要的是每个分区有 1 个序列文件,所以我这样做了:
SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
.setMaster("local[2]")
.set("spark.streaming.stopGracefullyOnShutdown", "true");
final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
//JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
if(!imageByteRDD.isEmpty())
imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
@Override
public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
throws Exception {
[°°°SOME STUFF°°°]
SequenceFile.Writer writer = SequenceFile.createWriter(
jsc.hadoopConfiguration(),
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context?
Previously, I created a Configuration for each partition, and it works, but I'm sure there is a much more "sparky way"
有谁知道如何在 RDD 闭包中使用 Hadoop 配置对象?
貌似做不到,所以这是我使用的代码:
final hdfsNameNodePath = "hdfs://quickstart.cloudera:8080";
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
if(!imageByteRDD.isEmpty())
imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
@Override
public void call(Iterator<Tuple2<String, PortableDataStream>> arg0)
throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNodePath);
//the string above should be passed as argument
SequenceFile.Writer writer = SequenceFile.createWriter(
conf,
SequenceFile.Writer.file([***ETCETERA...
这里的问题是 Hadoop 配置没有被标记为 Serializable
,所以 Spark 不会将它们拉入 RDD。它们被标记为 Writable
,因此 Hadoop 的序列化机制可以对它们进行编组和解组,但 Spark 不会直接使用它
两个长期修复选项是
- 添加对在 Spark 中序列化可写对象的支持。也许 SPARK-2421?
- 使 Hadoop 配置可序列化。
- 添加对序列化 Hadoop 配置的明确支持。
您不会对使 Hadoop conf 可序列化提出任何主要反对意见;如果您实现了委托给可写 IO 调用的自定义 ser/deser 方法(并且只遍历所有 key/value 对)。我是作为 Hadoop 提交者这么说的。
Update:这是创建可序列化 class 的代码,它确实编组 Hadoop 配置的内容。用 val ser = new ConfSerDeser(hadoopConf)
创建它;在您的 RDD 中将其称为 ser.get()
.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.hadoop.conf.Configuration
/**
* Class to make Hadoop configurations serializable; uses the
* `Writeable` operations to do this.
* Note: this only serializes the explicitly set values, not any set
* in site/default or other XML resources.
* @param conf
*/
class ConfigSerDeser(var conf: Configuration) extends Serializable {
def this() {
this(new Configuration())
}
def get(): Configuration = conf
private def writeObject (out: java.io.ObjectOutputStream): Unit = {
conf.write(out)
}
private def readObject (in: java.io.ObjectInputStream): Unit = {
conf = new Configuration()
conf.readFields(in)
}
private def readObjectNoData(): Unit = {
conf = new Configuration()
}
}
请注意,对于某些人来说,为所有可写 classes 制作此通用代码会相对简单;您只需要在构造函数中提供一个 class 名称,并在反序列化期间使用它来实例化可写对象。
根据@Steve 的回答,这是一个 java 实现。
import java.io.Serializable;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
public class SerializableHadoopConfiguration implements Serializable {
Configuration conf;
public SerializableHadoopConfiguration(Configuration hadoopConf) {
this.conf = hadoopConf;
if (this.conf == null) {
this.conf = new Configuration();
}
}
public SerializableHadoopConfiguration() {
this.conf = new Configuration();
}
public Configuration get() {
return this.conf;
}
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
this.conf.write(out);
}
private void readObject(java.io.ObjectInputStream in) throws IOException {
this.conf = new Configuration();
this.conf.readFields(in);
}
}
您可以使用 org.apache.spark.SerializableWritable
序列化和反序列化 org.apache.hadoop.conf.Configuration
。
例如:
import org.apache.spark.SerializableWritable
...
val hadoopConf = spark.sparkContext.hadoopConfiguration
// serialize here
val serializedConf = new SerializableWritable(hadoopConf)
// then access the conf by calling .value on serializedConf
rdd.map(someFunction(serializedConf.value))
查看 Spark 内部代码库,应该广播 Hadoop 配置的序列化版本。
val spark = SparkSession.builder.master("local").getOrCreate
val broadcastedHadoopConf = spark.sparkContext.broadcast(new org.apache.spark.util.SerializableConfiguration(spark.sparkContext.hadoopConfiguration))
val dfFiles = spark.read.format("binaryFile").load("/somepath").select("path")
val df = dfFiles.map {row => {
val rawPath = row.getString(0)
val path = new Path(new URI(rawPath.replace(" ", "%20")))
// get hadoop configuration in RDD method
val hadoopConf = broadcastedHadoopConf.value.value
val fs = path.getFileSystem(hadoopConfiguration)
val status = fs.getFileStatus(path)
val inputStream = fs.open(status.getPath)
// ... whatever you need to do to read data
}}