使用 json 模式更新 spark 数据框中的列
Updating column in spark dataframe with json schema
我有 json 个文件,我正在尝试使用 SHA 256 对其中的一个字段进行哈希处理。这些文件位于 AWS S3 上。我目前在 Apache Zeppelin 上使用带有 python 的 spark。
这是我的 json 模式,我正在尝试散列 'mac' 字段;
|-- Document: struct (nullable = true)
| |-- data: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- mac: string (nullable = true)
我已经尝试了一些东西;
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
import hashlib
hcData = sqlc.read.option("inferSchema","true").json(inputPath)
hcData.registerTempTable("hcData")
name = 'Document'
udf = UserDefinedFunction(lambda x: hashlib.sha256(str(x).encode('utf-8')).hexdigest(), StringType())
new_df = hcData.select(*[udf(column).alias(name) if column == name else column for column in hcData.columns])
这段代码工作正常。但是当我尝试散列 mac 字段并更改名称变量时,什么也没有发生;
name = 'Document.data[0].mac'
name = 'mac'
我猜是因为找不到具有给定名称的列。
我尝试稍微更改一下代码;
def valueToCategory(value):
return hashlib.sha256(str(value).encode('utf-8')).hexdigest()
udfValueToCategory = udf(valueToCategory, StringType())
df = hcData.withColumn("Document.data[0].mac",udfValueToCategory("Document.data.mac"))
此代码对 "Document.data.mac" 进行哈希处理,并 使用哈希处理的 mac 地址创建新列 。我想更新现有的专栏。对于那些没有嵌套的变量它可以更新,没有问题,但是对于嵌套变量我找不到更新的方法。
所以基本上,我想用 spark python 散列嵌套 json 文件中的一个字段。谁能知道如何使用 schema 更新 spark dataframe?
好吧,我已经用 scala 找到了我的问题的解决方案。可能有冗余代码,但它仍然有效。
import scala.util.matching.Regex
import java.security.MessageDigest
val inputPath = ""
val outputPath = ""
//finds mac addresses with given regex
def find(s: String, r: Regex): List[String] = {
val l = r.findAllIn(s).toList
if(!l.isEmpty){
return l
} else {
val lis: List[String] = List("null")
return lis
}
}
//hashes given string with sha256
def hash(s: String): String = {
return MessageDigest.getInstance("SHA-256").digest(s.getBytes).map(0xFF & _).map { "%02x".format(_) }.foldLeft(""){_ + _}
}
//hashes given line
def hashAll(s: String, r:Regex): String = {
var st = s
val macs = find(s, r)
for (mac <- macs){
st = st.replaceAll(mac, hash(mac))
}
return st
}
//read data
val rdd = sc.textFile(inputPath)
//mac address regular expression
val regex = "(([0-9A-Z]{1,2}[:-]){5}([0-9A-Z]{1,2}))".r
//hash data
val hashed_rdd = rdd.map(line => hashAll(line, regex))
//write hashed data
hashed_rdd.saveAsTextFile(outputPath)
下面是我的问题的python解决方案。
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
import hashlib
import re
def find(s, r):
l = re.findall(r, s)
if(len(l)!=0):
return l
else:
lis = ["null"]
return lis
def hash(s):
return hashlib.sha256(str(s).encode('utf-8')).hexdigest()
def hashAll(s, r):
st = s
macs = re.findall(r, s)
for mac in macs:
st = st.replace(mac, hash(mac))
return st
rdd = sc.textFile(inputPath)
regex = "([0-9A-Z]{1,2}[:-]){5}([0-9A-Z]{1,2})"
hashed_rdd = rdd.map(lambda line: hashAll(line, regex))
hashed_rdd.saveAsTextFile(outputPath)
我有 json 个文件,我正在尝试使用 SHA 256 对其中的一个字段进行哈希处理。这些文件位于 AWS S3 上。我目前在 Apache Zeppelin 上使用带有 python 的 spark。
这是我的 json 模式,我正在尝试散列 'mac' 字段;
|-- Document: struct (nullable = true)
| |-- data: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- mac: string (nullable = true)
我已经尝试了一些东西;
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
import hashlib
hcData = sqlc.read.option("inferSchema","true").json(inputPath)
hcData.registerTempTable("hcData")
name = 'Document'
udf = UserDefinedFunction(lambda x: hashlib.sha256(str(x).encode('utf-8')).hexdigest(), StringType())
new_df = hcData.select(*[udf(column).alias(name) if column == name else column for column in hcData.columns])
这段代码工作正常。但是当我尝试散列 mac 字段并更改名称变量时,什么也没有发生;
name = 'Document.data[0].mac'
name = 'mac'
我猜是因为找不到具有给定名称的列。
我尝试稍微更改一下代码;
def valueToCategory(value):
return hashlib.sha256(str(value).encode('utf-8')).hexdigest()
udfValueToCategory = udf(valueToCategory, StringType())
df = hcData.withColumn("Document.data[0].mac",udfValueToCategory("Document.data.mac"))
此代码对 "Document.data.mac" 进行哈希处理,并 使用哈希处理的 mac 地址创建新列 。我想更新现有的专栏。对于那些没有嵌套的变量它可以更新,没有问题,但是对于嵌套变量我找不到更新的方法。
所以基本上,我想用 spark python 散列嵌套 json 文件中的一个字段。谁能知道如何使用 schema 更新 spark dataframe?
好吧,我已经用 scala 找到了我的问题的解决方案。可能有冗余代码,但它仍然有效。
import scala.util.matching.Regex
import java.security.MessageDigest
val inputPath = ""
val outputPath = ""
//finds mac addresses with given regex
def find(s: String, r: Regex): List[String] = {
val l = r.findAllIn(s).toList
if(!l.isEmpty){
return l
} else {
val lis: List[String] = List("null")
return lis
}
}
//hashes given string with sha256
def hash(s: String): String = {
return MessageDigest.getInstance("SHA-256").digest(s.getBytes).map(0xFF & _).map { "%02x".format(_) }.foldLeft(""){_ + _}
}
//hashes given line
def hashAll(s: String, r:Regex): String = {
var st = s
val macs = find(s, r)
for (mac <- macs){
st = st.replaceAll(mac, hash(mac))
}
return st
}
//read data
val rdd = sc.textFile(inputPath)
//mac address regular expression
val regex = "(([0-9A-Z]{1,2}[:-]){5}([0-9A-Z]{1,2}))".r
//hash data
val hashed_rdd = rdd.map(line => hashAll(line, regex))
//write hashed data
hashed_rdd.saveAsTextFile(outputPath)
下面是我的问题的python解决方案。
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
import hashlib
import re
def find(s, r):
l = re.findall(r, s)
if(len(l)!=0):
return l
else:
lis = ["null"]
return lis
def hash(s):
return hashlib.sha256(str(s).encode('utf-8')).hexdigest()
def hashAll(s, r):
st = s
macs = re.findall(r, s)
for mac in macs:
st = st.replace(mac, hash(mac))
return st
rdd = sc.textFile(inputPath)
regex = "([0-9A-Z]{1,2}[:-]){5}([0-9A-Z]{1,2})"
hashed_rdd = rdd.map(lambda line: hashAll(line, regex))
hashed_rdd.saveAsTextFile(outputPath)