Spark 将 Kafka InputDStream 保存为 Json 文件
Spark save Kafka InputDStream as Json file
我只是想知道 Spark 中是否有一种方法,所以我可以将 JavaInputDStream 保存为 Json 文件,或者通常保存为任何文件。
如果没有,是否还有其他可能保存的内容
一个 kafka 主题作为 Spark 中的一个文件。
非常感谢!
当您将 JavaInputDStream
映射到 stream
时,您可以执行以下操作:
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
}).foreachPartition(partition -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
if (partition.hasNext()) {
PrintWriter out = new PrintWriter("filename.txt");;
out.println(text);
try {
while (partition.hasNext()) {
Tuple2<String, String> message = partition.next();
out.println(message);
}
} catch (Exception e) {
e.printStackTrace(
}
});
});
ssc.start();
ssc.awaitTermination();
请不要忘记,如果您的 Kafka 主题中有多个分区,您将按照上述方法为每个分区写入一个文件。
我只是想知道 Spark 中是否有一种方法,所以我可以将 JavaInputDStream 保存为 Json 文件,或者通常保存为任何文件。 如果没有,是否还有其他可能保存的内容 一个 kafka 主题作为 Spark 中的一个文件。
非常感谢!
当您将 JavaInputDStream
映射到 stream
时,您可以执行以下操作:
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
}).foreachPartition(partition -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
if (partition.hasNext()) {
PrintWriter out = new PrintWriter("filename.txt");;
out.println(text);
try {
while (partition.hasNext()) {
Tuple2<String, String> message = partition.next();
out.println(message);
}
} catch (Exception e) {
e.printStackTrace(
}
});
});
ssc.start();
ssc.awaitTermination();
请不要忘记,如果您的 Kafka 主题中有多个分区,您将按照上述方法为每个分区写入一个文件。