Scala 变量作用域和 for 循环

Scala variable scope and for loops

首先,我是一名 C# 开发人员,对 Scala 还很陌生。我们正在尝试使用 Spark 来查询 SQL 和 Cassandra,这是一个小的概念验证程序。

 var output: StringBuilder = new StringBuilder();
  try {
    //var output: StringBuilder = new StringBuilder();
    var config = new Config(args);
    val sparkConf = new SparkConf(true)
      .set("spark.cassandra.connection.host", config.cassanrdaClusterIp).set("spark.akka.heartbeat.interval", "100")
    //var output: StringBuilder = new StringBuilder();
    val sparkContext: SparkContext = new SparkContext(config.sparkConnectionString, "springcm-spark-webserver", sparkConf)
    val sqlContext = new CassandraSQLContext(sparkContext)
    val sqlConnectionString = "jdbc:sqlserver://" + config.sqlConnectionString;
    if (args(0) == "DocHistoryReport") {
    val docHistoryReport = new DocHistoryReport(sparkContext, sqlContext, sqlConnectionString, config.cassanrdaKeyspace)
    // var output: StringBuilder = new StringBuilder();
    var result = docHistoryReport.Execute(config.accountId, config.userId, config.startDate, config.endDate, config.dateBucketType);
    result.collect();
    var file: File = new File("result.csv");
    //  var output: StringBuilder = new StringBuilder();
    if (!file.exists()) {
      file.createNewFile();
    }
    val pw = new PrintWriter(file);
    result.foreach(row => {
      output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
    })
    pw.write(output.toString());
    pw.flush();
    pw.close;
  }
  else {
    throw new IllegalArgumentException("Unsuported report type " + args(0));
  }
}

该代码创建一个 spark 上下文,运行一个简单的报告并将结果写入文件。请注意,变量 output 在代码中被初始化了几次,但除了一个之外,其他都被注释掉了。如果我在当前声明的地方以外的任何地方初始化输出,result.csv 文件将为空,并且在执行 for 循环擦除结果期间输出变量将被重新初始化几次。

有人可以解释一下发生了什么以及为什么变量初始化的位置很重要。谢谢

如果这实际上取决于初始化位置,我会感到非常惊讶:无论如何预期结果都是空文件。

result.foreach(row => {
  output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
})

非常接近this example from the documentation

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

并且有同样的问题:foreachrow => ...)的参数被序列化并发送给每个worker,反序列化时它创建一个新的输出而不是引用原来的StringBuilder 在不同的进程或计算机上。阅读上面的链接部分应该可以帮助您了解更多。

可能的解决方案:

  1. 使用collect获取RDD的所有元素给驱动程序并对结果调用foreach(事实上你的程序已经这样做了,但是把结果扔掉了).

  2. 将用于写入文件的代码放入 foreach 调用中(显然它必须是一个可以从工作机器访问的文件,例如使用 HDFS)。

  3. 使用 Accumulator.

  4. 使用mapmapPartition在每个worker上分别构建你想要的字符串(得到一个RDD[String])并应用上述任一解决方案结果。