pyspark 数据帧的缓慢过滤
Slow filtering of pyspark dataframes
我对过滤 pandas 和 pyspark 数据帧时的时差有疑问:
import time
import numpy as np
import pandas as pd
from random import shuffle
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = pd.DataFrame(np.random.randint(1000000, size=400000).reshape(-1, 2))
list_filter = list(range(10000))
shuffle(list_filter)
# pandas is fast
t0 = time.time()
df_filtered = df[df[0].isin(list_filter)]
print(time.time() - t0)
# 0.0072
df_spark = spark.createDataFrame(df)
# pyspark is slow
t0 = time.time()
df_spark_filtered = df_spark[df_spark[0].isin(list_filter)]
print(time.time() - t0)
# 3.1232
如果我将 list_filter
的长度增加到 10000,则执行时间为 0.01353 和 17.6768 秒。 Pandas isin 的实现具有计算效率。你能解释一下为什么过滤 pyspark 数据帧这么慢吗?我怎样才能快速执行这种过滤?
Spark 设计用于处理大量数据。如果数据适合 pandas 数据框,pandas 总是会更快。问题是,对于大量数据,pandas 会失败,而 spark 会完成工作(例如比 MapReduce 更快)。
Spark 在这些情况下通常较慢,因为它需要开发要执行的操作的 DAG,例如执行计划,并尝试对其进行优化。
所以,你应该只在数据非常大的时候才考虑使用spark,否则使用pandas,会更快。
您可以检查 this article 并查看 pandas 和火花速度之间的比较,并且 pandas 总是更快,直到数据太大以至于失败。
您需要使用 join 代替 filter with isin 子句以加速 pyspark 中的过滤操作:
import time
import numpy as np
import pandas as pd
from random import shuffle
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = pd.DataFrame(np.random.randint(1000000, size=400000).reshape(-1, 2))
df_spark = spark.createDataFrame(df)
list_filter = list(range(10000))
list_filter_df = spark.createDataFrame([[x] for x in list_filter], df_spark.columns[:1])
shuffle(list_filter)
# pandas is fast because everything in memory
t0 = time.time()
df_filtered = df[df[0].isin(list_filter)]
print(time.time() - t0)
# 0.0227580165863
# 0.0127580165863
# pyspark is slow because there is memory overhead, but broadcast make is mast compared to isin with lists
t0 = time.time()
df_spark_filtered = df_spark.join(F.broadcast(list_filter_df), df_spark.columns[:1])
print(time.time() - t0)
# 0.0571971035004
# 0.0471971035004
我对过滤 pandas 和 pyspark 数据帧时的时差有疑问:
import time
import numpy as np
import pandas as pd
from random import shuffle
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = pd.DataFrame(np.random.randint(1000000, size=400000).reshape(-1, 2))
list_filter = list(range(10000))
shuffle(list_filter)
# pandas is fast
t0 = time.time()
df_filtered = df[df[0].isin(list_filter)]
print(time.time() - t0)
# 0.0072
df_spark = spark.createDataFrame(df)
# pyspark is slow
t0 = time.time()
df_spark_filtered = df_spark[df_spark[0].isin(list_filter)]
print(time.time() - t0)
# 3.1232
如果我将 list_filter
的长度增加到 10000,则执行时间为 0.01353 和 17.6768 秒。 Pandas isin
Spark 设计用于处理大量数据。如果数据适合 pandas 数据框,pandas 总是会更快。问题是,对于大量数据,pandas 会失败,而 spark 会完成工作(例如比 MapReduce 更快)。
Spark 在这些情况下通常较慢,因为它需要开发要执行的操作的 DAG,例如执行计划,并尝试对其进行优化。
所以,你应该只在数据非常大的时候才考虑使用spark,否则使用pandas,会更快。
您可以检查 this article 并查看 pandas 和火花速度之间的比较,并且 pandas 总是更快,直到数据太大以至于失败。
您需要使用 join 代替 filter with isin 子句以加速 pyspark 中的过滤操作:
import time
import numpy as np
import pandas as pd
from random import shuffle
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = pd.DataFrame(np.random.randint(1000000, size=400000).reshape(-1, 2))
df_spark = spark.createDataFrame(df)
list_filter = list(range(10000))
list_filter_df = spark.createDataFrame([[x] for x in list_filter], df_spark.columns[:1])
shuffle(list_filter)
# pandas is fast because everything in memory
t0 = time.time()
df_filtered = df[df[0].isin(list_filter)]
print(time.time() - t0)
# 0.0227580165863
# 0.0127580165863
# pyspark is slow because there is memory overhead, but broadcast make is mast compared to isin with lists
t0 = time.time()
df_spark_filtered = df_spark.join(F.broadcast(list_filter_df), df_spark.columns[:1])
print(time.time() - t0)
# 0.0571971035004
# 0.0471971035004