Scala 变量作用域和 for 循环
Scala variable scope and for loops
首先,我是一名 C# 开发人员,对 Scala 还很陌生。我们正在尝试使用 Spark 来查询 SQL 和 Cassandra,这是一个小的概念验证程序。
var output: StringBuilder = new StringBuilder();
try {
//var output: StringBuilder = new StringBuilder();
var config = new Config(args);
val sparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", config.cassanrdaClusterIp).set("spark.akka.heartbeat.interval", "100")
//var output: StringBuilder = new StringBuilder();
val sparkContext: SparkContext = new SparkContext(config.sparkConnectionString, "springcm-spark-webserver", sparkConf)
val sqlContext = new CassandraSQLContext(sparkContext)
val sqlConnectionString = "jdbc:sqlserver://" + config.sqlConnectionString;
if (args(0) == "DocHistoryReport") {
val docHistoryReport = new DocHistoryReport(sparkContext, sqlContext, sqlConnectionString, config.cassanrdaKeyspace)
// var output: StringBuilder = new StringBuilder();
var result = docHistoryReport.Execute(config.accountId, config.userId, config.startDate, config.endDate, config.dateBucketType);
result.collect();
var file: File = new File("result.csv");
// var output: StringBuilder = new StringBuilder();
if (!file.exists()) {
file.createNewFile();
}
val pw = new PrintWriter(file);
result.foreach(row => {
output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
})
pw.write(output.toString());
pw.flush();
pw.close;
}
else {
throw new IllegalArgumentException("Unsuported report type " + args(0));
}
}
该代码创建一个 spark 上下文,运行一个简单的报告并将结果写入文件。请注意,变量 output 在代码中被初始化了几次,但除了一个之外,其他都被注释掉了。如果我在当前声明的地方以外的任何地方初始化输出,result.csv 文件将为空,并且在执行 for 循环擦除结果期间输出变量将被重新初始化几次。
有人可以解释一下发生了什么以及为什么变量初始化的位置很重要。谢谢
如果这实际上取决于初始化位置,我会感到非常惊讶:无论如何预期结果都是空文件。
result.foreach(row => {
output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
})
非常接近this example from the documentation:
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
并且有同样的问题:foreach
(row => ...
)的参数被序列化并发送给每个worker,反序列化时它创建一个新的输出而不是引用原来的StringBuilder
在不同的进程或计算机上。阅读上面的链接部分应该可以帮助您了解更多。
可能的解决方案:
使用collect
获取RDD的所有元素给驱动程序并对结果调用foreach
(事实上你的程序已经这样做了,但是把结果扔掉了).
将用于写入文件的代码放入 foreach
调用中(显然它必须是一个可以从工作机器访问的文件,例如使用 HDFS)。
使用 Accumulator.
使用map
或mapPartition
在每个worker上分别构建你想要的字符串(得到一个RDD[String]
)并应用上述任一解决方案结果。
首先,我是一名 C# 开发人员,对 Scala 还很陌生。我们正在尝试使用 Spark 来查询 SQL 和 Cassandra,这是一个小的概念验证程序。
var output: StringBuilder = new StringBuilder();
try {
//var output: StringBuilder = new StringBuilder();
var config = new Config(args);
val sparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", config.cassanrdaClusterIp).set("spark.akka.heartbeat.interval", "100")
//var output: StringBuilder = new StringBuilder();
val sparkContext: SparkContext = new SparkContext(config.sparkConnectionString, "springcm-spark-webserver", sparkConf)
val sqlContext = new CassandraSQLContext(sparkContext)
val sqlConnectionString = "jdbc:sqlserver://" + config.sqlConnectionString;
if (args(0) == "DocHistoryReport") {
val docHistoryReport = new DocHistoryReport(sparkContext, sqlContext, sqlConnectionString, config.cassanrdaKeyspace)
// var output: StringBuilder = new StringBuilder();
var result = docHistoryReport.Execute(config.accountId, config.userId, config.startDate, config.endDate, config.dateBucketType);
result.collect();
var file: File = new File("result.csv");
// var output: StringBuilder = new StringBuilder();
if (!file.exists()) {
file.createNewFile();
}
val pw = new PrintWriter(file);
result.foreach(row => {
output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
})
pw.write(output.toString());
pw.flush();
pw.close;
}
else {
throw new IllegalArgumentException("Unsuported report type " + args(0));
}
}
该代码创建一个 spark 上下文,运行一个简单的报告并将结果写入文件。请注意,变量 output 在代码中被初始化了几次,但除了一个之外,其他都被注释掉了。如果我在当前声明的地方以外的任何地方初始化输出,result.csv 文件将为空,并且在执行 for 循环擦除结果期间输出变量将被重新初始化几次。
有人可以解释一下发生了什么以及为什么变量初始化的位置很重要。谢谢
如果这实际上取决于初始化位置,我会感到非常惊讶:无论如何预期结果都是空文件。
result.foreach(row => {
output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
})
非常接近this example from the documentation:
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
并且有同样的问题:foreach
(row => ...
)的参数被序列化并发送给每个worker,反序列化时它创建一个新的输出而不是引用原来的StringBuilder
在不同的进程或计算机上。阅读上面的链接部分应该可以帮助您了解更多。
可能的解决方案:
使用
collect
获取RDD的所有元素给驱动程序并对结果调用foreach
(事实上你的程序已经这样做了,但是把结果扔掉了).将用于写入文件的代码放入
foreach
调用中(显然它必须是一个可以从工作机器访问的文件,例如使用 HDFS)。使用 Accumulator.
使用
map
或mapPartition
在每个worker上分别构建你想要的字符串(得到一个RDD[String]
)并应用上述任一解决方案结果。