没有 运行 内存不足的 Spark 打印数据帧
Spark print dataframe Without Running Out Of Memory
如何在 Java 中打印整个数据帧而不 运行 内存不足?
Dataset<Row> df = ...
我知道:
df.show()
将显示数据框,但如果数据框足够大,则可能 运行 内存不足。
我知道我可以使用以下方式限制内容:
df.show(rowCount, false)
但是想要打印整个数据框,我不想限制内容...
我试过:
df.foreachPartition(iter -> {
while(iter.hasNext()){
System.out.println(rowIter.next().mkString(",");)
}
});
但这将打印在每个相应的节点上,而不是驱动程序上...
如果有任何方法可以在不 运行内存不足的情况下打印驱动程序中的所有内容?
据我所知,打印数据框的想法是为了查看数据。
不建议打印大型数据帧,因为数据帧大小可能会导致内存不足。
我提供以下几种方式,如果你想看内容可以保存在hive中table然后查询内容。或写入可读的 csv 或 json
示例:
1) 保存在配置单元中table
df.write.mode("overwrite").saveAsTable("database.tableName")
稍后从配置单元查询 table。
2) csv 或 json
df.write.csv("/your/location/data.csv")
df.write.json("/your/location/data.json")
如果你想使用单个文件,以上将生成多个部分文件coalesce(1)
(但这将再次将数据移动到一个节点,除非你绝对需要它,否则不鼓励这样做)
其他选项是使用 逐行打印,这也会将数据传输到节点...因此这不是个好主意
您将不得不将所有数据带到驱动程序,这会有点消耗您的记忆:(...
一个解决方案可能是拆分您的数据框并在驱动程序中逐个打印。当然,这取决于数据本身的结构,它看起来像:
long count = df.count();
long inc = count / 10;
for (long i = 0; i < count; i += inc) {
Dataset<Row> filteredDf =
df.where("id>=" + i + " AND id<" + (i + inc));
List<Row> rows = filteredDf.collectAsList();
for (Row r : rows) {
System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));
}
}
我将数据集分成 10 个,但我知道我的 ID 是从 1 到 100...
完整的例子可以是:
package net.jgp.books.sparkWithJava.ch20.lab900_splitting_dataframe;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* Splitting a dataframe to bring it back to the driver for local
* processing.
*
* @author jgp
*/
public class SplittingDataframeApp {
/**
* main() is your entry point to the application.
*
* @param args
*/
public static void main(String[] args) {
SplittingDataframeApp app = new SplittingDataframeApp();
app.start();
}
/**
* The processing code.
*/
private void start() {
// Creates a session on a local master
SparkSession spark = SparkSession.builder()
.appName("Splitting a dataframe to collect it")
.master("local")
.getOrCreate();
Dataset<Row> df = createRandomDataframe(spark);
df = df.cache();
df.show();
long count = df.count();
long inc = count / 10;
for (long i = 0; i < count; i += inc) {
Dataset<Row> filteredDf =
df.where("id>=" + i + " AND id<" + (i + inc));
List<Row> rows = filteredDf.collectAsList();
for (Row r : rows) {
System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));
}
}
}
private static Dataset<Row> createRandomDataframe(SparkSession spark) {
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField(
"id",
DataTypes.IntegerType,
false),
DataTypes.createStructField(
"value",
DataTypes.StringType,
false) });
List<Row> rows = new ArrayList<Row>();
for (int i = 0; i < 100; i++) {
rows.add(RowFactory.create(i, "Row #" + i));
}
Dataset<Row> df = spark.createDataFrame(rows, schema);
return df;
}
}
您认为这有帮助吗?
它不像将其保存在数据库中那样优雅,但它可以避免在您的体系结构中添加额外的组件。此代码不是很通用,我不确定您是否可以在当前版本的 Spark 中使其通用。
如何在 Java 中打印整个数据帧而不 运行 内存不足?
Dataset<Row> df = ...
我知道:
df.show()
将显示数据框,但如果数据框足够大,则可能 运行 内存不足。
我知道我可以使用以下方式限制内容:
df.show(rowCount, false)
但是想要打印整个数据框,我不想限制内容...
我试过:
df.foreachPartition(iter -> {
while(iter.hasNext()){
System.out.println(rowIter.next().mkString(",");)
}
});
但这将打印在每个相应的节点上,而不是驱动程序上...
如果有任何方法可以在不 运行内存不足的情况下打印驱动程序中的所有内容?
据我所知,打印数据框的想法是为了查看数据。
不建议打印大型数据帧,因为数据帧大小可能会导致内存不足。
我提供以下几种方式,如果你想看内容可以保存在hive中table然后查询内容。或写入可读的 csv 或 json
示例:
1) 保存在配置单元中table
df.write.mode("overwrite").saveAsTable("database.tableName")
稍后从配置单元查询 table。
2) csv 或 json
df.write.csv("/your/location/data.csv")
df.write.json("/your/location/data.json")
如果你想使用单个文件,以上将生成多个部分文件coalesce(1)
(但这将再次将数据移动到一个节点,除非你绝对需要它,否则不鼓励这样做)
其他选项是使用
您将不得不将所有数据带到驱动程序,这会有点消耗您的记忆:(...
一个解决方案可能是拆分您的数据框并在驱动程序中逐个打印。当然,这取决于数据本身的结构,它看起来像:
long count = df.count();
long inc = count / 10;
for (long i = 0; i < count; i += inc) {
Dataset<Row> filteredDf =
df.where("id>=" + i + " AND id<" + (i + inc));
List<Row> rows = filteredDf.collectAsList();
for (Row r : rows) {
System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));
}
}
我将数据集分成 10 个,但我知道我的 ID 是从 1 到 100...
完整的例子可以是:
package net.jgp.books.sparkWithJava.ch20.lab900_splitting_dataframe;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* Splitting a dataframe to bring it back to the driver for local
* processing.
*
* @author jgp
*/
public class SplittingDataframeApp {
/**
* main() is your entry point to the application.
*
* @param args
*/
public static void main(String[] args) {
SplittingDataframeApp app = new SplittingDataframeApp();
app.start();
}
/**
* The processing code.
*/
private void start() {
// Creates a session on a local master
SparkSession spark = SparkSession.builder()
.appName("Splitting a dataframe to collect it")
.master("local")
.getOrCreate();
Dataset<Row> df = createRandomDataframe(spark);
df = df.cache();
df.show();
long count = df.count();
long inc = count / 10;
for (long i = 0; i < count; i += inc) {
Dataset<Row> filteredDf =
df.where("id>=" + i + " AND id<" + (i + inc));
List<Row> rows = filteredDf.collectAsList();
for (Row r : rows) {
System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));
}
}
}
private static Dataset<Row> createRandomDataframe(SparkSession spark) {
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField(
"id",
DataTypes.IntegerType,
false),
DataTypes.createStructField(
"value",
DataTypes.StringType,
false) });
List<Row> rows = new ArrayList<Row>();
for (int i = 0; i < 100; i++) {
rows.add(RowFactory.create(i, "Row #" + i));
}
Dataset<Row> df = spark.createDataFrame(rows, schema);
return df;
}
}
您认为这有帮助吗?
它不像将其保存在数据库中那样优雅,但它可以避免在您的体系结构中添加额外的组件。此代码不是很通用,我不确定您是否可以在当前版本的 Spark 中使其通用。