依赖于公共列的两个数据帧之间的交叉连接
Crossjoin between two dataframes that is dependent on a common column
交叉连接可以按如下方式完成:
df1 = pd.DataFrame({'subgroup':['A','B','C','D']})
df2 = pd.DataFrame({'dates':pd.date_range(date_today, date_today + timedelta(3), freq='D')})
sdf1 = spark.createDataFrame(df1)
sdf2 = spark.createDataFrame(df2)
sdf1.crossJoin(sdf2).toPandas()
在此示例中,有两个数据框,每个数据框包含 4 行,最后,我得到 16 行。
但是,对于我的问题,我想对每个用户进行交叉连接,并且用户是两个数据框中的另一列,例如:
df1 = pd.DataFrame({'user':[1,1,1,1,2,2,2,2],'subgroup':['A','B','C','D','A','B','D','E']})
df2 = pd.DataFrame({'user':[1,1,1,1,2,2,2,2],'dates':np.hstack([np.array(pd.date_range(date_today, date_today + timedelta(3), freq='D')),np.array(pd.date_range(date_today+timedelta(1), date_today + timedelta(4), freq='D'))])})
应用每用户 crossJoin 的结果应该是一个包含 32 行的数据框。这在 pyspark 中可行吗?如何实现?
交叉连接是一种生成行乘法的连接,因为连接键不能唯一地标识行(在我们的例子中,连接键是微不足道的或者根本没有连接键)
让我们从示例数据帧开始:
import pyspark.sql.functions as psf
import pyspark.sql.types as pst
df1 = spark.createDataFrame(
[[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())],
schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value1']]))
df2 = spark.createDataFrame(
[[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())],
schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value2']]))
+----+------+
|user|value1|
+----+------+
| 0| 76|
| 1| 59|
| 0| 14|
| 1| 71|
| 0| 66|
| 1| 61|
| 0| 2|
| 1| 22|
| 0| 16|
| 1| 83|
+----+------+
+----+------+
|user|value2|
+----+------+
| 0| 65|
| 1| 81|
| 0| 60|
| 1| 69|
| 0| 21|
| 1| 61|
| 0| 98|
| 1| 76|
| 0| 40|
| 1| 21|
+----+------+
让我们尝试在常量列上连接数据帧,以查看在常量(普通)列上交叉连接和常规连接之间的等价性:
df = df1.withColumn('key', psf.lit(1)) \
.join(df2.withColumn('key', psf.lit(1)), on=['key'])
我们从 spark > 2 中得到一个错误,因为它意识到我们正在尝试进行交叉连接(笛卡尔积)
Py4JJavaError: An error occurred while calling o1865.showString.
: org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
LogicalRDD [user#1538, value1#1539], false
and
LogicalRDD [user#1542, value2#1543], false
Join condition is missing or trivial.
Either: use the CROSS JOIN syntax to allow cartesian products between these
relations, or: enable implicit cartesian products by setting the configuration
variable spark.sql.crossJoin.enabled=true;
如果您的连接键(user
此处)不是唯一标识行的列,您也会得到行的乘法,但在每个 user
组中:
df = df1.join(df2, on='user')
print("Number of rows : \tdf1: {} \tdf2: {} \tdf: {}".format(df1.count(), df2.count(), df.count()))
Number of rows : df1: 10 df2: 10 df: 50
+----+------+------+
|user|value1|value2|
+----+------+------+
| 1| 59| 81|
| 1| 59| 69|
| 1| 59| 61|
| 1| 59| 76|
| 1| 59| 21|
| 1| 71| 81|
| 1| 71| 69|
| 1| 71| 61|
| 1| 71| 76|
| 1| 71| 21|
| 1| 61| 81|
| 1| 61| 69|
| 1| 61| 61|
| 1| 61| 76|
| 1| 61| 21|
| 1| 22| 81|
| 1| 22| 69|
| 1| 22| 61|
| 1| 22| 76|
| 1| 22| 21|
+----+------+------+
5 * 5 行用户 0
+ 5 * 5 行用户 1
,因此 50
注意: 使用 self join
后接 filter
通常意味着您应该使用 window 函数 代替。
交叉连接可以按如下方式完成:
df1 = pd.DataFrame({'subgroup':['A','B','C','D']})
df2 = pd.DataFrame({'dates':pd.date_range(date_today, date_today + timedelta(3), freq='D')})
sdf1 = spark.createDataFrame(df1)
sdf2 = spark.createDataFrame(df2)
sdf1.crossJoin(sdf2).toPandas()
在此示例中,有两个数据框,每个数据框包含 4 行,最后,我得到 16 行。
但是,对于我的问题,我想对每个用户进行交叉连接,并且用户是两个数据框中的另一列,例如:
df1 = pd.DataFrame({'user':[1,1,1,1,2,2,2,2],'subgroup':['A','B','C','D','A','B','D','E']})
df2 = pd.DataFrame({'user':[1,1,1,1,2,2,2,2],'dates':np.hstack([np.array(pd.date_range(date_today, date_today + timedelta(3), freq='D')),np.array(pd.date_range(date_today+timedelta(1), date_today + timedelta(4), freq='D'))])})
应用每用户 crossJoin 的结果应该是一个包含 32 行的数据框。这在 pyspark 中可行吗?如何实现?
交叉连接是一种生成行乘法的连接,因为连接键不能唯一地标识行(在我们的例子中,连接键是微不足道的或者根本没有连接键)
让我们从示例数据帧开始:
import pyspark.sql.functions as psf
import pyspark.sql.types as pst
df1 = spark.createDataFrame(
[[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())],
schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value1']]))
df2 = spark.createDataFrame(
[[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())],
schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value2']]))
+----+------+
|user|value1|
+----+------+
| 0| 76|
| 1| 59|
| 0| 14|
| 1| 71|
| 0| 66|
| 1| 61|
| 0| 2|
| 1| 22|
| 0| 16|
| 1| 83|
+----+------+
+----+------+
|user|value2|
+----+------+
| 0| 65|
| 1| 81|
| 0| 60|
| 1| 69|
| 0| 21|
| 1| 61|
| 0| 98|
| 1| 76|
| 0| 40|
| 1| 21|
+----+------+
让我们尝试在常量列上连接数据帧,以查看在常量(普通)列上交叉连接和常规连接之间的等价性:
df = df1.withColumn('key', psf.lit(1)) \
.join(df2.withColumn('key', psf.lit(1)), on=['key'])
我们从 spark > 2 中得到一个错误,因为它意识到我们正在尝试进行交叉连接(笛卡尔积)
Py4JJavaError: An error occurred while calling o1865.showString. : org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans LogicalRDD [user#1538, value1#1539], false and LogicalRDD [user#1542, value2#1543], false Join condition is missing or trivial. Either: use the CROSS JOIN syntax to allow cartesian products between these relations, or: enable implicit cartesian products by setting the configuration variable spark.sql.crossJoin.enabled=true;
如果您的连接键(user
此处)不是唯一标识行的列,您也会得到行的乘法,但在每个 user
组中:
df = df1.join(df2, on='user')
print("Number of rows : \tdf1: {} \tdf2: {} \tdf: {}".format(df1.count(), df2.count(), df.count()))
Number of rows : df1: 10 df2: 10 df: 50
+----+------+------+
|user|value1|value2|
+----+------+------+
| 1| 59| 81|
| 1| 59| 69|
| 1| 59| 61|
| 1| 59| 76|
| 1| 59| 21|
| 1| 71| 81|
| 1| 71| 69|
| 1| 71| 61|
| 1| 71| 76|
| 1| 71| 21|
| 1| 61| 81|
| 1| 61| 69|
| 1| 61| 61|
| 1| 61| 76|
| 1| 61| 21|
| 1| 22| 81|
| 1| 22| 69|
| 1| 22| 61|
| 1| 22| 76|
| 1| 22| 21|
+----+------+------+
5 * 5 行用户 0
+ 5 * 5 行用户 1
,因此 50
注意: 使用 self join
后接 filter
通常意味着您应该使用 window 函数 代替。