如何将 spark sql 查询的所有输出保存到文本文件中
How to save all the output of spark sql query into a text file
我正在使用 Spark Streaming 编写一个简单的消费者程序。我的代码将一些数据保存到文件中,但不是所有数据。谁能帮我解决这个问题。我不确定我在哪里丢失数据。我从 kafka 主题获取数据,然后从 java Bean class 应用我的模式。
public class ConsumerFile {
public static void main(String[] args){
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
String topic = args[0];
final String path=new String(args[2]);
String broker = args[1];
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");;
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", broker);
JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(
ssc, String.class, String.class,StringDecoder.class,StringDecoder.class,kafkaPrams,
topicsSet
);
JavaDStream<String> words = kafkaStream.map(new Function<Tuple2<String, String>, String>()
{
public String call(Tuple2<String, String> message)
{
return message._2();}});
words.foreachRDD(
new Function2<JavaRDD<String>, Time, Void>() {
public Void call(JavaRDD<String> rdd, Time time) {
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
public JavaRow call(String line) throws Exception{
String[] fields = line.split(",");
JavaRow record = new JavaRow(fields[0], fields[1],fields[2] );
return record;
}
});
DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
wordsDataFrame.registerTempTable("Data");
DataFrame wDataFrame = sqlContext.sql(" select * from Data");
if(!wDataFrame.rdd().isEmpty()){
wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path); }
return null;
}} );
ssc.start();
ssc.awaitTermination();}
}
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("select * from tableName)
df.write.text("/path/to/file")
它将被写成一个分区的文本文件,因此您的结果将分布在一堆标记为 part-00000 的文件中,但它会在那里。
我发现了为什么会这样,以防其他人遇到同样的问题。当您执行 foreachRDD 时,它实际上是在 DStream 的每个 RDD 上执行您的函数,您将它们全部保存到同一个文件中。所以他们会覆盖彼此的数据,第一个或最后一个写入者获胜。最简单的修复方法是将它们保存在一个具有唯一名称的文件中。所以我使用了 saveAsTextFile(path + time().milliseconds().toString()) 并解决了这个问题。但是,您可以两次使用相同的时间戳,所以我通过添加随机数使它更加独特。
这可能是因为您没有指定书写方式。而是使用这个,
df.write.mode('append').text("/path/to/file")
P.s:我在java做的不习惯,我给的是scala/python等价的
我正在使用 Spark Streaming 编写一个简单的消费者程序。我的代码将一些数据保存到文件中,但不是所有数据。谁能帮我解决这个问题。我不确定我在哪里丢失数据。我从 kafka 主题获取数据,然后从 java Bean class 应用我的模式。
public class ConsumerFile {
public static void main(String[] args){
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
String topic = args[0];
final String path=new String(args[2]);
String broker = args[1];
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");;
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", broker);
JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(
ssc, String.class, String.class,StringDecoder.class,StringDecoder.class,kafkaPrams,
topicsSet
);
JavaDStream<String> words = kafkaStream.map(new Function<Tuple2<String, String>, String>()
{
public String call(Tuple2<String, String> message)
{
return message._2();}});
words.foreachRDD(
new Function2<JavaRDD<String>, Time, Void>() {
public Void call(JavaRDD<String> rdd, Time time) {
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
public JavaRow call(String line) throws Exception{
String[] fields = line.split(",");
JavaRow record = new JavaRow(fields[0], fields[1],fields[2] );
return record;
}
});
DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
wordsDataFrame.registerTempTable("Data");
DataFrame wDataFrame = sqlContext.sql(" select * from Data");
if(!wDataFrame.rdd().isEmpty()){
wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path); }
return null;
}} );
ssc.start();
ssc.awaitTermination();}
}
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("select * from tableName)
df.write.text("/path/to/file")
它将被写成一个分区的文本文件,因此您的结果将分布在一堆标记为 part-00000 的文件中,但它会在那里。
我发现了为什么会这样,以防其他人遇到同样的问题。当您执行 foreachRDD 时,它实际上是在 DStream 的每个 RDD 上执行您的函数,您将它们全部保存到同一个文件中。所以他们会覆盖彼此的数据,第一个或最后一个写入者获胜。最简单的修复方法是将它们保存在一个具有唯一名称的文件中。所以我使用了 saveAsTextFile(path + time().milliseconds().toString()) 并解决了这个问题。但是,您可以两次使用相同的时间戳,所以我通过添加随机数使它更加独特。
这可能是因为您没有指定书写方式。而是使用这个,
df.write.mode('append').text("/path/to/file")
P.s:我在java做的不习惯,我给的是scala/python等价的