如何在 spark 的 mySQL 中保存 Dataset<Row>?
How to save Dataset<Row> in mySQL in spark?
我在我的场景中使用 spark 独立集群。我想从 Azure 数据湖读取 JSON 文件并使用 SparkSQL 对其进行一些查询并将结果保存到 mysql 数据库中。我不知道该怎么做。小帮助将是大帮助。
package com.biz.Read_from_ADL;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class App {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();
Dataset<Row> df = spark.read().json("adl://pare.azuredatalakestore.net/EXCHANGE_DATA/BITFINEX/ETHBTC/MIDPOINT/BITFINEX_ETHBTC_MIDPOINT_2017-06-25.json");
//df.show();
df.createOrReplaceTempView("trade");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM trade");
sqlDF.show();
}
}
您需要先定义连接属性和jdbc url。
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", "USER_NAME")
connectionProperties.put("password", "PASSWORD")
val jdbc_url = ... // <- use mysql url
import org.apache.spark.sql.SaveMode
spark.sql("select * from diamonds limit 10").withColumnRenamed("table", "table_number")
.write
.mode(SaveMode.Append) // <--- Append to the existing table
.jdbc(jdbc_url, "diamonds_mysql", connectionProperties)
参考here了解更多详情。
我在我的场景中使用 spark 独立集群。我想从 Azure 数据湖读取 JSON 文件并使用 SparkSQL 对其进行一些查询并将结果保存到 mysql 数据库中。我不知道该怎么做。小帮助将是大帮助。
package com.biz.Read_from_ADL;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class App {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();
Dataset<Row> df = spark.read().json("adl://pare.azuredatalakestore.net/EXCHANGE_DATA/BITFINEX/ETHBTC/MIDPOINT/BITFINEX_ETHBTC_MIDPOINT_2017-06-25.json");
//df.show();
df.createOrReplaceTempView("trade");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM trade");
sqlDF.show();
}
}
您需要先定义连接属性和jdbc url。
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", "USER_NAME")
connectionProperties.put("password", "PASSWORD")
val jdbc_url = ... // <- use mysql url
import org.apache.spark.sql.SaveMode
spark.sql("select * from diamonds limit 10").withColumnRenamed("table", "table_number")
.write
.mode(SaveMode.Append) // <--- Append to the existing table
.jdbc(jdbc_url, "diamonds_mysql", connectionProperties)
参考here了解更多详情。