列数据值一致性检查 PySpark SQL
Column data value consistency check PySpark SQL
我有两个 table 具有相同的列名、相同的数据、相同的行数,但行的顺序可能不同。现在我 select 来自 table_1 的 A 列和来自 table_2 的 A 列并比较这些值。我如何使用 PySpark 实现此目的 SQL 我可以执行 sha2/md5 校验和比较吗?
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row
import pyspark.sql.functions as f
app_name="test"
table1="DB1.department"
table2="DB2.department"
conf = SparkConf().setAppName(app_name)
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
query1="select * from %s" %(table1)
df1 = sqlContext.sql(query1)
query2="select * from %s" %(table2)
df2 = sqlContext.sql(query2)
df3=sqlContext.sql(SELECT DB1.departmentid FROM DB1.department a FULL JOIN
DB2.department b ON a.departmentid = b.departmentid WHERE a.departmentid
IS NULL OR b.departmentid IS NULL)
df5=sqlContext.sql("select md5(departmentid) from department1")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line
813, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot resolve 'md5(departmentid)'
due to data type mismatch: argument 1 requires binary type, however,
'departmentid'是bigint类型;第 1 行位置 11"
当尝试使用 md5 校验和时,它说它需要二进制类型,但部门 ID 是 bigint
表 1:
departmentid departmentname departmentaddress
1 A Newyork
2 B Newjersey
3 C SanJose
4 D WashingtonDC
5 E Mexico
6 F Delhi
7 G Pune
8 H chennai
表 2:
departmentid departmentname departmentaddress
7 G Pune
8 H chennai
1 A Newyork
2 B Newjersey
3 C SanJose
4 D WashingtonDC
5 E Mexico
6 F Delhi
在 table 中,两个行的顺序刚刚改变,但数据仍然如此,现在从技术上讲,这两个 table 是相同的。除非添加新行或修改值,否则这两个 table 是相同的(以表为例和解释,实际上我们处理大数据)
最简单的解决方案是:
def is_identical(x, y):
return (x.count() == y.count()) and (x.subtract(y).count() == 0)
示例数据:
df1 = spark.createDataFrame(
[(1, "A", "Newyork"), (2, "B", "Newjersey"),
(3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi"),
(7, "G", "Pune"), (8, "H", "chennai")],
("departmentid", "departmentname", "departmentadd"))
df2 = spark.createDataFrame(
[(7, "G", "Pune"), (8, "H", "chennai"), (1, "A", "Newyork"), (2, "B", "Newjersey"),
(3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi")],
("departmentid", "departmentname", "departmentadd"))
df3 = spark.createDataFrame(
[(1, "A", "New York"), (2, "B", "New Jersey"),
(3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi"),
(7, "G", "Pune"), (8, "H", "chennai")],
("departmentid", "departmentname", "departmentadd"))
df4 = spark.createDataFrame(
[(3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi")],
("departmentid", "departmentname", "departmentadd"))
支票:
is_identical(df1, df2)
# True
is_identical(df1, df3)
# False
is_identical(df1, df4)
# False
is_identical(df4, df4)
# True
有外连接
from pyspark.sql.functions import col, coalesce, lit
from functools import reduce
from operator import and_
def is_identical_(x, y, keys=("departmentid", )):
def both_null(c):
return (col("x.{}".format(c)).isNull() &
col("y.{}".format(c)).isNull())
def both_equal(c):
return coalesce((col("x.{}".format(c)) ==
col("y.{}".format(c))), lit(False))
p = reduce(and_, [both_null(c) | both_equal(c) for c in x.columns if c not in keys])
return (x.alias("x").join(y.alias("y"), list(keys), "full_outer")
.where(~p).count() == 0)
你会得到相同的结果:
is_identical_(df1, df2)
# True
is_identical_(df1, df3)
# False
is_identical_(df1, df4)
# False
is_identical_(df4, df4)
# True
md5
不适合你,因为它不是聚合函数。它计算特定值的校验和。
我有两个 table 具有相同的列名、相同的数据、相同的行数,但行的顺序可能不同。现在我 select 来自 table_1 的 A 列和来自 table_2 的 A 列并比较这些值。我如何使用 PySpark 实现此目的 SQL 我可以执行 sha2/md5 校验和比较吗?
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row
import pyspark.sql.functions as f
app_name="test"
table1="DB1.department"
table2="DB2.department"
conf = SparkConf().setAppName(app_name)
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
query1="select * from %s" %(table1)
df1 = sqlContext.sql(query1)
query2="select * from %s" %(table2)
df2 = sqlContext.sql(query2)
df3=sqlContext.sql(SELECT DB1.departmentid FROM DB1.department a FULL JOIN
DB2.department b ON a.departmentid = b.departmentid WHERE a.departmentid
IS NULL OR b.departmentid IS NULL)
df5=sqlContext.sql("select md5(departmentid) from department1")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line
813, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot resolve 'md5(departmentid)'
due to data type mismatch: argument 1 requires binary type, however,
'departmentid'是bigint类型;第 1 行位置 11"
当尝试使用 md5 校验和时,它说它需要二进制类型,但部门 ID 是 bigint
表 1:
departmentid departmentname departmentaddress
1 A Newyork
2 B Newjersey
3 C SanJose
4 D WashingtonDC
5 E Mexico
6 F Delhi
7 G Pune
8 H chennai
表 2:
departmentid departmentname departmentaddress
7 G Pune
8 H chennai
1 A Newyork
2 B Newjersey
3 C SanJose
4 D WashingtonDC
5 E Mexico
6 F Delhi
在 table 中,两个行的顺序刚刚改变,但数据仍然如此,现在从技术上讲,这两个 table 是相同的。除非添加新行或修改值,否则这两个 table 是相同的(以表为例和解释,实际上我们处理大数据)
最简单的解决方案是:
def is_identical(x, y):
return (x.count() == y.count()) and (x.subtract(y).count() == 0)
示例数据:
df1 = spark.createDataFrame(
[(1, "A", "Newyork"), (2, "B", "Newjersey"),
(3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi"),
(7, "G", "Pune"), (8, "H", "chennai")],
("departmentid", "departmentname", "departmentadd"))
df2 = spark.createDataFrame(
[(7, "G", "Pune"), (8, "H", "chennai"), (1, "A", "Newyork"), (2, "B", "Newjersey"),
(3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi")],
("departmentid", "departmentname", "departmentadd"))
df3 = spark.createDataFrame(
[(1, "A", "New York"), (2, "B", "New Jersey"),
(3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi"),
(7, "G", "Pune"), (8, "H", "chennai")],
("departmentid", "departmentname", "departmentadd"))
df4 = spark.createDataFrame(
[(3, "C", "SanJose"), (4, "D", "WashingtonDC"), (5, "E", "Mexico"), (6, "F", "Delhi")],
("departmentid", "departmentname", "departmentadd"))
支票:
is_identical(df1, df2)
# True
is_identical(df1, df3)
# False
is_identical(df1, df4)
# False
is_identical(df4, df4)
# True
有外连接
from pyspark.sql.functions import col, coalesce, lit
from functools import reduce
from operator import and_
def is_identical_(x, y, keys=("departmentid", )):
def both_null(c):
return (col("x.{}".format(c)).isNull() &
col("y.{}".format(c)).isNull())
def both_equal(c):
return coalesce((col("x.{}".format(c)) ==
col("y.{}".format(c))), lit(False))
p = reduce(and_, [both_null(c) | both_equal(c) for c in x.columns if c not in keys])
return (x.alias("x").join(y.alias("y"), list(keys), "full_outer")
.where(~p).count() == 0)
你会得到相同的结果:
is_identical_(df1, df2)
# True
is_identical_(df1, df3)
# False
is_identical_(df1, df4)
# False
is_identical_(df4, df4)
# True
md5
不适合你,因为它不是聚合函数。它计算特定值的校验和。