如何在spark SQL(PySpark)中实现自增
How to implement auto increment in spark SQL(PySpark)
我需要在我的 spark sql table 中实现一个自动递增列,我该怎么做。请指导我。我正在使用 pyspark 2.0
谢谢
卡莲
我会 write/reuse stateful Hive udf 并向 pySpark 注册,因为 Spark SQL 确实对 Hive 有很好的支持。
检查下面代码中的这一行 @UDFType(deterministic = false, stateful = true)
以确保它是有状态 UDF。
package org.apache.hadoop.hive.contrib.udf;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;
/**
* UDFRowSequence.
*/
@Description(name = "row_sequence",
value = "_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false, stateful = true)
public class UDFRowSequence extends UDF
{
private LongWritable result = new LongWritable();
public UDFRowSequence() {
result.set(0);
}
public LongWritable evaluate() {
result.set(result.get() + 1);
return result;
}
}
// End UDFRowSequence.java
现在构建 jar 并在 pyspark 启动时添加位置。
$ pyspark --jars your_jar_name.jar
然后用sqlContext
注册。
sqlContext.sql("CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'")
现在在 select 查询中使用 row_seq()
sqlContext.sql("SELECT row_seq(), col1, col2 FROM table_name")
我需要在我的 spark sql table 中实现一个自动递增列,我该怎么做。请指导我。我正在使用 pyspark 2.0
谢谢 卡莲
我会 write/reuse stateful Hive udf 并向 pySpark 注册,因为 Spark SQL 确实对 Hive 有很好的支持。
检查下面代码中的这一行 @UDFType(deterministic = false, stateful = true)
以确保它是有状态 UDF。
package org.apache.hadoop.hive.contrib.udf;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;
/**
* UDFRowSequence.
*/
@Description(name = "row_sequence",
value = "_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false, stateful = true)
public class UDFRowSequence extends UDF
{
private LongWritable result = new LongWritable();
public UDFRowSequence() {
result.set(0);
}
public LongWritable evaluate() {
result.set(result.get() + 1);
return result;
}
}
// End UDFRowSequence.java
现在构建 jar 并在 pyspark 启动时添加位置。
$ pyspark --jars your_jar_name.jar
然后用sqlContext
注册。
sqlContext.sql("CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'")
现在在 select 查询中使用 row_seq()
sqlContext.sql("SELECT row_seq(), col1, col2 FROM table_name")