pyspark 生成特定列的行哈希并将其添加为新列
pyspark generate row hash of specific columns and add it as a new column
我正在使用 spark 2.2.0 和 pyspark2。
我创建了一个 DataFrame df
,现在尝试添加一个新列 "rowhash"
,它是 DataFrame 中特定列的 sha2 哈希。
例如,假设 df
具有以下列:(column1, column2, ..., column10)
我需要 sha2((column2||column3||column4||...... column8), 256)
在新的列中 "rowhash"
。
目前,我尝试使用以下方法:
1) 使用了 hash()
函数,但由于它给出了一个整数输出,所以用处不大
2) 尝试使用 sha2()
函数但失败了。
说 columnarray
有我需要的列数组。
def concat(columnarray):
concat_str = ''
for val in columnarray:
concat_str = concat_str + '||' + str(val)
concat_str = concat_str[2:]
return concat_str
然后是
df1 = df1.withColumn("row_sha2", sha2(concat(columnarray),256))
失败,出现 "cannot resolve" 错误。
谢谢你的回答。由于我只需要散列特定的列,我创建了这些列名称的列表(在 hash_col 中)并将您的函数更改为:
def sha_concat(row, columnarray):
row_dict = row.asDict() #transform row to a dict
concat_str = ''
for v in columnarray:
concat_str = concat_str + '||' + str(row_dict.get(v))
concat_str = concat_str[2:]
#preserve concatenated value for testing (this can be removed later)
row_dict["sha_values"] = concat_str
row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest()
return Row(**row_dict)
然后传递为:
df1.rdd.map(lambda row: sha_concat(row,hash_col)).toDF().show(truncate=False)
但是现在失败并出现错误:
UnicodeEncodeError: 'ascii' codec can't encode character u'\ufffd' in position 8: ordinal not in range(128)
我可以在其中一列中看到 \ufffd 的值,所以我不确定是否有办法处理这个问题?
如果您想要数据集不同列中每个值的哈希值,您可以通过 map
将自行设计的函数应用于数据帧的 rdd。
import hashlib
test_df = spark.createDataFrame([
(1,"2",5,1),(3,"4",7,8),
], ("col1","col2","col3","col4"))
def sha_concat(row):
row_dict = row.asDict() #transform row to a dict
columnarray = row_dict.keys() #get the column names
concat_str = ''
for v in row_dict.values():
concat_str = concat_str + '||' + str(v) #concatenate values
concat_str = concat_str[2:]
row_dict["sha_values"] = concat_str #preserve concatenated value for testing (this can be removed later)
row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest() #calculate sha256
return Row(**row_dict)
test_df.rdd.map(sha_concat).toDF().show(truncate=False)
结果如下:
+----+----+----+----+----------------------------------------------------------------+----------+
|col1|col2|col3|col4|sha_hash |sha_values|
+----+----+----+----+----------------------------------------------------------------+----------+
|1 |2 |5 |1 |1b0ae4beb8ce031cf585e9bb79df7d32c3b93c8c73c27d8f2c2ddc2de9c8edcd|1||2||5||1|
|3 |4 |7 |8 |cb8f8c5d9fd7165cf3c0f019e0fb10fa0e8f147960c715b7f6a60e149d3923a5|8||4||7||3|
+----+----+----+----+----------------------------------------------------------------+----------+
您可以使用 pyspark.sql.functions.concat_ws()
to concatenate your columns and pyspark.sql.functions.sha2()
获取 SHA256 哈希值。
使用来自@gaw 的数据:
from pyspark.sql.functions import sha2, concat_ws
df = spark.createDataFrame(
[(1,"2",5,1),(3,"4",7,8)],
("col1","col2","col3","col4")
)
df.withColumn("row_sha2", sha2(concat_ws("||", *df.columns), 256)).show(truncate=False)
#+----+----+----+----+----------------------------------------------------------------+
#|col1|col2|col3|col4|row_sha2 |
#+----+----+----+----+----------------------------------------------------------------+
#|1 |2 |5 |1 |1b0ae4beb8ce031cf585e9bb79df7d32c3b93c8c73c27d8f2c2ddc2de9c8edcd|
#|3 |4 |7 |8 |57f057bdc4178b69b1b6ab9d78eabee47133790cba8cf503ac1658fa7a496db1|
#+----+----+----+----+----------------------------------------------------------------+
您可以根据文档将 0
或 256
作为第二个参数传递给 sha2()
:
Returns the hex string result of SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, and SHA-512). The numBits indicates the desired bit length of the result, which must have a value of 224, 256, 384, 512, or 0 (which is equivalent to 256).
函数concat_ws
接受一个分隔符和一个要连接的列列表。我将 ||
作为分隔符传入,将 df.columns
作为列列表传入。
我在这里使用了所有列,但您可以指定您想要的任何列子集 - 在您的情况下是 columnarray
。 (您需要使用 *
来解压列表。)
2.0 版中的新功能是 hash
函数。
from pyspark.sql.functions import hash
(
spark
.createDataFrame([(1,'Abe'),(2,'Ben'),(3,'Cas')], ('id','name'))
.withColumn('hashed_name', hash('name'))
).show()
结果是:
+---+----+-----------+
| id|name|hashed_name|
+---+----+-----------+
| 1| Abe| 1567000248|
| 2| Ben| 1604243918|
| 3| Cas| -586163893|
+---+----+-----------+
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#hash
我正在使用 spark 2.2.0 和 pyspark2。
我创建了一个 DataFrame df
,现在尝试添加一个新列 "rowhash"
,它是 DataFrame 中特定列的 sha2 哈希。
例如,假设 df
具有以下列:(column1, column2, ..., column10)
我需要 sha2((column2||column3||column4||...... column8), 256)
在新的列中 "rowhash"
。
目前,我尝试使用以下方法:
1) 使用了 hash()
函数,但由于它给出了一个整数输出,所以用处不大
2) 尝试使用 sha2()
函数但失败了。
说 columnarray
有我需要的列数组。
def concat(columnarray):
concat_str = ''
for val in columnarray:
concat_str = concat_str + '||' + str(val)
concat_str = concat_str[2:]
return concat_str
然后是
df1 = df1.withColumn("row_sha2", sha2(concat(columnarray),256))
失败,出现 "cannot resolve" 错误。
谢谢你的回答。由于我只需要散列特定的列,我创建了这些列名称的列表(在 hash_col 中)并将您的函数更改为:
def sha_concat(row, columnarray):
row_dict = row.asDict() #transform row to a dict
concat_str = ''
for v in columnarray:
concat_str = concat_str + '||' + str(row_dict.get(v))
concat_str = concat_str[2:]
#preserve concatenated value for testing (this can be removed later)
row_dict["sha_values"] = concat_str
row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest()
return Row(**row_dict)
然后传递为:
df1.rdd.map(lambda row: sha_concat(row,hash_col)).toDF().show(truncate=False)
但是现在失败并出现错误:
UnicodeEncodeError: 'ascii' codec can't encode character u'\ufffd' in position 8: ordinal not in range(128)
我可以在其中一列中看到 \ufffd 的值,所以我不确定是否有办法处理这个问题?
如果您想要数据集不同列中每个值的哈希值,您可以通过 map
将自行设计的函数应用于数据帧的 rdd。
import hashlib
test_df = spark.createDataFrame([
(1,"2",5,1),(3,"4",7,8),
], ("col1","col2","col3","col4"))
def sha_concat(row):
row_dict = row.asDict() #transform row to a dict
columnarray = row_dict.keys() #get the column names
concat_str = ''
for v in row_dict.values():
concat_str = concat_str + '||' + str(v) #concatenate values
concat_str = concat_str[2:]
row_dict["sha_values"] = concat_str #preserve concatenated value for testing (this can be removed later)
row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest() #calculate sha256
return Row(**row_dict)
test_df.rdd.map(sha_concat).toDF().show(truncate=False)
结果如下:
+----+----+----+----+----------------------------------------------------------------+----------+
|col1|col2|col3|col4|sha_hash |sha_values|
+----+----+----+----+----------------------------------------------------------------+----------+
|1 |2 |5 |1 |1b0ae4beb8ce031cf585e9bb79df7d32c3b93c8c73c27d8f2c2ddc2de9c8edcd|1||2||5||1|
|3 |4 |7 |8 |cb8f8c5d9fd7165cf3c0f019e0fb10fa0e8f147960c715b7f6a60e149d3923a5|8||4||7||3|
+----+----+----+----+----------------------------------------------------------------+----------+
您可以使用 pyspark.sql.functions.concat_ws()
to concatenate your columns and pyspark.sql.functions.sha2()
获取 SHA256 哈希值。
使用来自@gaw 的数据:
from pyspark.sql.functions import sha2, concat_ws
df = spark.createDataFrame(
[(1,"2",5,1),(3,"4",7,8)],
("col1","col2","col3","col4")
)
df.withColumn("row_sha2", sha2(concat_ws("||", *df.columns), 256)).show(truncate=False)
#+----+----+----+----+----------------------------------------------------------------+
#|col1|col2|col3|col4|row_sha2 |
#+----+----+----+----+----------------------------------------------------------------+
#|1 |2 |5 |1 |1b0ae4beb8ce031cf585e9bb79df7d32c3b93c8c73c27d8f2c2ddc2de9c8edcd|
#|3 |4 |7 |8 |57f057bdc4178b69b1b6ab9d78eabee47133790cba8cf503ac1658fa7a496db1|
#+----+----+----+----+----------------------------------------------------------------+
您可以根据文档将 0
或 256
作为第二个参数传递给 sha2()
:
Returns the hex string result of SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, and SHA-512). The numBits indicates the desired bit length of the result, which must have a value of 224, 256, 384, 512, or 0 (which is equivalent to 256).
函数concat_ws
接受一个分隔符和一个要连接的列列表。我将 ||
作为分隔符传入,将 df.columns
作为列列表传入。
我在这里使用了所有列,但您可以指定您想要的任何列子集 - 在您的情况下是 columnarray
。 (您需要使用 *
来解压列表。)
2.0 版中的新功能是 hash
函数。
from pyspark.sql.functions import hash
(
spark
.createDataFrame([(1,'Abe'),(2,'Ben'),(3,'Cas')], ('id','name'))
.withColumn('hashed_name', hash('name'))
).show()
结果是:
+---+----+-----------+
| id|name|hashed_name|
+---+----+-----------+
| 1| Abe| 1567000248|
| 2| Ben| 1604243918|
| 3| Cas| -586163893|
+---+----+-----------+
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#hash