如何使用 Spark 2 屏蔽列?
How to mask columns using Spark 2?
我有一些 table 需要屏蔽其中的一些列。要屏蔽的列从 table 到 table 不等,我正在从 application.conf
文件中读取这些列。
例如,员工 table 如下所示
+----+------+-----+---------+
| id | name | age | address |
+----+------+-----+---------+
| 1 | abcd | 21 | India |
+----+------+-----+---------+
| 2 | qazx | 42 | Germany |
+----+------+-----+---------+
如果我们想屏蔽姓名和年龄列,那么我会按顺序获取这些列。
val mask = Seq("name", "age")
屏蔽后的预期值为:
+----+----------------+----------------+---------+
| id | name | age | address |
+----+----------------+----------------+---------+
| 1 | *** Masked *** | *** Masked *** | India |
+----+----------------+----------------+---------+
| 2 | *** Masked *** | *** Masked *** | Germany |
+----+----------------+----------------+---------+
如果我有员工 table 一个数据框,那么屏蔽这些列的方法是什么?
如果我有如下所示的 payment
table 并且想要屏蔽 name
和 salary
列,那么我将序列中的屏蔽列设为
+----+------+--------+----------+
| id | name | salary | tax_code |
+----+------+--------+----------+
| 1 | abcd | 12345 | KT10 |
+----+------+--------+----------+
| 2 | qazx | 98765 | AD12d |
+----+------+--------+----------+
val mask = Seq("name", "salary")
我试过这样的东西 mask.foreach(c => base.withColumn(c, regexp_replace(col(c), "^.*?$", "*** Masked ***" ) ) )
但它没有返回任何东西。
感谢@philantrovert,我找到了解决方案。这是我使用的解决方案:
def maskData(base: DataFrame, maskColumns: Seq[String]) = {
val maskExpr = base.columns.map { col => if(maskColumns.contains(col)) s"'*** Masked ***' as ${col}" else col }
base.selectExpr(maskExpr: _*)
}
请检查下面的代码。关键是udf
函数。
val df = ss.sparkContext.parallelize( Seq (
("c1", "JAN-2017", 49 ),
("c1", "MAR-2017", 83),
)).toDF("city", "month", "sales")
df.show()
val mask = udf( (s : String) => {
"*** Masked ***"
})
df.withColumn("city", mask($"city")).show`
你的陈述
mask.foreach(c => base.withColumn(c, regexp_replace(col(c), "^.*?$", "*** Masked ***" ) ) )
会 return List[org.apache.spark.sql.DataFrame]
听起来不太好。
您可以使用 selectExpr
并使用 :
生成您的 regexp_replace
表达式
base.show
+---+----+-----+-------+
| id|name| age|address|
+---+----+-----+-------+
| 1|abcd|12345| KT10 |
| 2|qazx|98765| AD12d|
+---+----+-----+-------+
val mask = Seq("name", "age")
val expr = df.columns.map { col =>
if (mask.contains(col) ) s"""regexp_replace(${col}, "^.*", "** Masked **" ) as ${col}"""
else col
}
这将为序列 mask
中存在的列生成具有 regex_replace 的表达式
Array[String] = Array(id, regexp_replace(name, "^.*", "** Masked **" ) as name, regexp_replace(age, "^.*", "** Masked **" ) as age, address)
现在您可以在生成的序列
上使用selectExpr
base.selectExpr(expr: _*).show
+---+------------+------------+-------+
| id| name| age|address|
+---+------------+------------+-------+
| 1|** Masked **|** Masked **| KT10 |
| 2|** Masked **|** Masked **| AD12d|
+---+------------+------------+-------+
最简单和最快的方法是使用 withColumn
并简单地用 "*** Masked ***"
覆盖列中的值。使用您的小示例数据框
val df = spark.sparkContext.parallelize( Seq (
(1, "abcd", 12345, "KT10" ),
(2, "qazx", 98765, "AD12d")
)).toDF("id", "name", "salary", "tax_code")
如果要屏蔽的列数量较少且名称已知,那么您可以简单地执行以下操作:
val mask = Seq("name", "salary")
df.withColumn("name", lit("*** Masked ***"))
.withColumn("salary", lit("*** Masked ***"))
否则,您需要创建一个循环:
var df2 = df
for (col <- mask){
df2 = df2.withColumn(col, lit("*** Masked ***"))
}
这两种方法都会给您这样的结果:
+---+--------------+--------------+--------+
| id| name| salary|tax_code|
+---+--------------+--------------+--------+
| 1|*** Masked ***|*** Masked ***| KT10|
| 2|*** Masked ***|*** Masked ***| AD12d|
+---+--------------+--------------+--------+
我有一些 table 需要屏蔽其中的一些列。要屏蔽的列从 table 到 table 不等,我正在从 application.conf
文件中读取这些列。
例如,员工 table 如下所示
+----+------+-----+---------+
| id | name | age | address |
+----+------+-----+---------+
| 1 | abcd | 21 | India |
+----+------+-----+---------+
| 2 | qazx | 42 | Germany |
+----+------+-----+---------+
如果我们想屏蔽姓名和年龄列,那么我会按顺序获取这些列。
val mask = Seq("name", "age")
屏蔽后的预期值为:
+----+----------------+----------------+---------+
| id | name | age | address |
+----+----------------+----------------+---------+
| 1 | *** Masked *** | *** Masked *** | India |
+----+----------------+----------------+---------+
| 2 | *** Masked *** | *** Masked *** | Germany |
+----+----------------+----------------+---------+
如果我有员工 table 一个数据框,那么屏蔽这些列的方法是什么?
如果我有如下所示的 payment
table 并且想要屏蔽 name
和 salary
列,那么我将序列中的屏蔽列设为
+----+------+--------+----------+
| id | name | salary | tax_code |
+----+------+--------+----------+
| 1 | abcd | 12345 | KT10 |
+----+------+--------+----------+
| 2 | qazx | 98765 | AD12d |
+----+------+--------+----------+
val mask = Seq("name", "salary")
我试过这样的东西 mask.foreach(c => base.withColumn(c, regexp_replace(col(c), "^.*?$", "*** Masked ***" ) ) )
但它没有返回任何东西。
感谢@philantrovert,我找到了解决方案。这是我使用的解决方案:
def maskData(base: DataFrame, maskColumns: Seq[String]) = {
val maskExpr = base.columns.map { col => if(maskColumns.contains(col)) s"'*** Masked ***' as ${col}" else col }
base.selectExpr(maskExpr: _*)
}
请检查下面的代码。关键是udf
函数。
val df = ss.sparkContext.parallelize( Seq (
("c1", "JAN-2017", 49 ),
("c1", "MAR-2017", 83),
)).toDF("city", "month", "sales")
df.show()
val mask = udf( (s : String) => {
"*** Masked ***"
})
df.withColumn("city", mask($"city")).show`
你的陈述
mask.foreach(c => base.withColumn(c, regexp_replace(col(c), "^.*?$", "*** Masked ***" ) ) )
会 return List[org.apache.spark.sql.DataFrame]
听起来不太好。
您可以使用 selectExpr
并使用 :
regexp_replace
表达式
base.show
+---+----+-----+-------+
| id|name| age|address|
+---+----+-----+-------+
| 1|abcd|12345| KT10 |
| 2|qazx|98765| AD12d|
+---+----+-----+-------+
val mask = Seq("name", "age")
val expr = df.columns.map { col =>
if (mask.contains(col) ) s"""regexp_replace(${col}, "^.*", "** Masked **" ) as ${col}"""
else col
}
这将为序列 mask
Array[String] = Array(id, regexp_replace(name, "^.*", "** Masked **" ) as name, regexp_replace(age, "^.*", "** Masked **" ) as age, address)
现在您可以在生成的序列
上使用selectExpr
base.selectExpr(expr: _*).show
+---+------------+------------+-------+
| id| name| age|address|
+---+------------+------------+-------+
| 1|** Masked **|** Masked **| KT10 |
| 2|** Masked **|** Masked **| AD12d|
+---+------------+------------+-------+
最简单和最快的方法是使用 withColumn
并简单地用 "*** Masked ***"
覆盖列中的值。使用您的小示例数据框
val df = spark.sparkContext.parallelize( Seq (
(1, "abcd", 12345, "KT10" ),
(2, "qazx", 98765, "AD12d")
)).toDF("id", "name", "salary", "tax_code")
如果要屏蔽的列数量较少且名称已知,那么您可以简单地执行以下操作:
val mask = Seq("name", "salary")
df.withColumn("name", lit("*** Masked ***"))
.withColumn("salary", lit("*** Masked ***"))
否则,您需要创建一个循环:
var df2 = df
for (col <- mask){
df2 = df2.withColumn(col, lit("*** Masked ***"))
}
这两种方法都会给您这样的结果:
+---+--------------+--------------+--------+
| id| name| salary|tax_code|
+---+--------------+--------------+--------+
| 1|*** Masked ***|*** Masked ***| KT10|
| 2|*** Masked ***|*** Masked ***| AD12d|
+---+--------------+--------------+--------+