Pyspark - 根据语言过滤行
Pyspark -Filtering the rows based on language
此问题与 Pyspark 有关。我正在阅读一个包含几列的 TSV 文件。一个特定的列是评论列。我的任务是根据语言过滤掉行。例如,如果评论是俄语,那么我想过滤该特定行并将其保存在单独的文件中。
现在在阅读文件时我正在使用下面的代码制作数据帧。
Info = sqlContext.read.format("csv"). \
option("delimiter","\t"). \
option("header", "True"). \
option("inferSchema", "True"). \
load("file.tsv")
DataFrame[ID: int Comments: string]
然后我尝试使用 ORD 函数根据 ASCII 值过滤掉记录:
Info.filter((map(ord,Info.Comments)) < 128).collect()
但是,我收到一个错误:
TypeError: argument 2 to map() must support iteration
示例输入:
Comments
{175:'Аксессуары'}
{156:'Горные'}
{45:'Кровати, диваны и кресла'}
{45:'Кровати, диваны и кресла'}
请提出一些解决方案。任何 help/suggestion 表示赞赏。
已更新:
@ags29
我已经通过编写这段代码纠正了我在评论中提到的错误。
spark_ord=F.udf(lambda x: [ord(c) for c in x],t.ArrayType(IntegerType()))
Info=Info.withColumn('russ', spark_ord('Comments'))
DataFrame[ID: int, Comments: string, russ: array<int>]
现在的问题是创建 Array[Int]。我必须根据数组中存在的值过滤整行,该值应小于 128。
我正在努力实现这一目标。请提出建议。
这没有经过测试,但按照这些思路应该可以工作:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# create user defined function from ord
spark_ord=udf(lambda x: ord(x), IntegerType())
Info=Info.withColumn('ord', spark_ord('Comments'))
Info=Info.filter('ord<128')
基本上,要将 ord
函数与 DataFrame 一起使用,您需要一个用户定义的函数。您尝试的方法需要一个 RDD,而不是一个 DataFrame
@ags29 感谢您的建议。
答案如下:
如上所述通过读取文件创建Dataframe后,我们必须用一些值替换Null值,在这种情况下我将其替换为NA。
InfoWoNull = Info.fillna({'Comments':'NA'})
然后,创建 UDF 以使用 ORD 函数查找字符串中每个字符的 ASCII 值。输出将是整数数组。
from pyspark.sql import functions as F
from pyspark.sql import types as t
from pyspark.sql.types import ArrayType, IntegerType
russ_ord=F.udf(lambda x: [ord(a) for a in x],t.ArrayType(IntegerType()))
创建Filter函数,根据ASCII字符过滤掉大于127的值。
def russian_filter(x):
for index in range(len(x)):
if x[index] > 127:
return True
return False
filter_udf = F.udf(russian_filter, BooleanType())
在最后一步使用它,如下所示。
Info_rus = InfoWoNull.filter(filter_udf(russ_ord('SearchParams')) == 'true')
Info_rus.show()
此问题与 Pyspark 有关。我正在阅读一个包含几列的 TSV 文件。一个特定的列是评论列。我的任务是根据语言过滤掉行。例如,如果评论是俄语,那么我想过滤该特定行并将其保存在单独的文件中。
现在在阅读文件时我正在使用下面的代码制作数据帧。
Info = sqlContext.read.format("csv"). \
option("delimiter","\t"). \
option("header", "True"). \
option("inferSchema", "True"). \
load("file.tsv")
DataFrame[ID: int Comments: string]
然后我尝试使用 ORD 函数根据 ASCII 值过滤掉记录:
Info.filter((map(ord,Info.Comments)) < 128).collect()
但是,我收到一个错误:
TypeError: argument 2 to map() must support iteration
示例输入:
Comments
{175:'Аксессуары'}
{156:'Горные'}
{45:'Кровати, диваны и кресла'}
{45:'Кровати, диваны и кресла'}
请提出一些解决方案。任何 help/suggestion 表示赞赏。
已更新:
@ags29
我已经通过编写这段代码纠正了我在评论中提到的错误。
spark_ord=F.udf(lambda x: [ord(c) for c in x],t.ArrayType(IntegerType()))
Info=Info.withColumn('russ', spark_ord('Comments'))
DataFrame[ID: int, Comments: string, russ: array<int>]
现在的问题是创建 Array[Int]。我必须根据数组中存在的值过滤整行,该值应小于 128。
我正在努力实现这一目标。请提出建议。
这没有经过测试,但按照这些思路应该可以工作:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# create user defined function from ord
spark_ord=udf(lambda x: ord(x), IntegerType())
Info=Info.withColumn('ord', spark_ord('Comments'))
Info=Info.filter('ord<128')
基本上,要将 ord
函数与 DataFrame 一起使用,您需要一个用户定义的函数。您尝试的方法需要一个 RDD,而不是一个 DataFrame
@ags29 感谢您的建议。
答案如下:
如上所述通过读取文件创建Dataframe后,我们必须用一些值替换Null值,在这种情况下我将其替换为NA。
InfoWoNull = Info.fillna({'Comments':'NA'})
然后,创建 UDF 以使用 ORD 函数查找字符串中每个字符的 ASCII 值。输出将是整数数组。
from pyspark.sql import functions as F
from pyspark.sql import types as t
from pyspark.sql.types import ArrayType, IntegerType
russ_ord=F.udf(lambda x: [ord(a) for a in x],t.ArrayType(IntegerType()))
创建Filter函数,根据ASCII字符过滤掉大于127的值。
def russian_filter(x):
for index in range(len(x)):
if x[index] > 127:
return True
return False
filter_udf = F.udf(russian_filter, BooleanType())
在最后一步使用它,如下所示。
Info_rus = InfoWoNull.filter(filter_udf(russ_ord('SearchParams')) == 'true')
Info_rus.show()