为什么pyspark中两种不同的数据处理方式会产生不同的结果?
Why do two different data processing ways in pyspark produce separate results?
我正在尝试从我当前的数据集创建一个样本数据集。我尝试了两种不同的方法,它们产生了两个不同的结果。以某种方式分隔每个采样行应该是整数和字符串([5,unprivate],[1,hiprivate])。第一种方法是为每一行提供字符串和字符串([private,private],[unprivate, hiprivate])。第二种方法是给我正确的输出。
为什么他们要生成两个完全不同的数据集?
数据集
5,unprivate
1,private
2,hiprivate
摄取数据
from pyspark import SparkContext
sc = SparkContext()
INPUT = "./dataset"
def parse_line(line):
bits = line.split(",")
return bits
df = sc.textFile(INPUT).map(parse_line)
第一种方式 - 输出类似
[[u'unprivate', u'unprivate'], [u'unprivate', u'unprivate']]
#1st way
columns = df.first()
new_df = None
for i in range(0, len(columns)):
column = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[i]).zipWithIndex().map(lambda e: (e[1], [e[0]]))
if new_df is None:
new_df = column
else:
new_df = new_df.join(column)
new_df = new_df.map(lambda e: (e[0], e[1][0] + e[1][1]))
new_df = new_df.map(lambda e: e[1])
print new_df.collect()
第二种方式 - 输出类似
[(0, [u'5', u'unprivate']), (1, [u'1', u'unprivate']), (2, [u'2', u'private'])]
#2nd way
new_df = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[0]).zipWithIndex().map(lambda e: (e[1], [e[0]]))
new_df2 = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[1]).zipWithIndex().map(lambda e: (e[1], [e[0]]))
new_df = new_df.join(new_df2)
new_df = new_df.map(lambda e: (e[0], e[1][0] + e[1][1]))
print new_df.collect()
我正在尝试找出第 62 页中的 unisample 函数
http://info.mapr.com/rs/mapr/images/Getting_Started_With_Apache_Spark.pdf
这与Spark如何执行代码有关。在第一个示例中尝试将此打印语句放入您的代码中:
for i in range(0, len(columns)):
if new_df:
print(new_df.take(1))
由于代码是延迟执行的 for
这样的循环将不起作用,因为 Spark 实际上只会执行最后一个循环。因此,当您为第二列启动 for 循环时,您已经获得了 new_df
的值,它等于第二个 for 循环的输出。
您必须使用在第二个示例中使用的方法。
我正在尝试从我当前的数据集创建一个样本数据集。我尝试了两种不同的方法,它们产生了两个不同的结果。以某种方式分隔每个采样行应该是整数和字符串([5,unprivate],[1,hiprivate])。第一种方法是为每一行提供字符串和字符串([private,private],[unprivate, hiprivate])。第二种方法是给我正确的输出。
为什么他们要生成两个完全不同的数据集?
数据集
5,unprivate
1,private
2,hiprivate
摄取数据
from pyspark import SparkContext
sc = SparkContext()
INPUT = "./dataset"
def parse_line(line):
bits = line.split(",")
return bits
df = sc.textFile(INPUT).map(parse_line)
第一种方式 - 输出类似
[[u'unprivate', u'unprivate'], [u'unprivate', u'unprivate']]
#1st way
columns = df.first()
new_df = None
for i in range(0, len(columns)):
column = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[i]).zipWithIndex().map(lambda e: (e[1], [e[0]]))
if new_df is None:
new_df = column
else:
new_df = new_df.join(column)
new_df = new_df.map(lambda e: (e[0], e[1][0] + e[1][1]))
new_df = new_df.map(lambda e: e[1])
print new_df.collect()
第二种方式 - 输出类似
[(0, [u'5', u'unprivate']), (1, [u'1', u'unprivate']), (2, [u'2', u'private'])]
#2nd way
new_df = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[0]).zipWithIndex().map(lambda e: (e[1], [e[0]]))
new_df2 = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[1]).zipWithIndex().map(lambda e: (e[1], [e[0]]))
new_df = new_df.join(new_df2)
new_df = new_df.map(lambda e: (e[0], e[1][0] + e[1][1]))
print new_df.collect()
我正在尝试找出第 62 页中的 unisample 函数 http://info.mapr.com/rs/mapr/images/Getting_Started_With_Apache_Spark.pdf
这与Spark如何执行代码有关。在第一个示例中尝试将此打印语句放入您的代码中:
for i in range(0, len(columns)):
if new_df:
print(new_df.take(1))
由于代码是延迟执行的 for
这样的循环将不起作用,因为 Spark 实际上只会执行最后一个循环。因此,当您为第二列启动 for 循环时,您已经获得了 new_df
的值,它等于第二个 for 循环的输出。
您必须使用在第二个示例中使用的方法。