Spark Structured Streaming / Spark 中的条件爆炸 SQL

Conditional Explode in Spark Structured Streaming / Spark SQL

我正在尝试在 Spark Structured Streaming 中进行条件爆炸。

例如,我的流式数据帧如下所示(完全是这里的数据)。我想在 contingent = 1 时将员工数组分解为单独的数组行。当contingent = 0时,我需要让数组保持原样。

|----------------|---------------------|------------------|
|     Dept ID    |     Employees       |    Contingent    |
|----------------|---------------------|------------------|
|          1     | ["John", "Jane"]    |       1          |
|----------------|---------------------|------------------|
|          4     | ["Amy", "James"]    |       0          |
|----------------|---------------------|------------------|
|          2     | ["David"]           |       1          |
|----------------|---------------------|------------------|

因此,我的输出应该如下所示(我不需要显示 contingent 列:

|----------------|---------------------|
|     Dept ID    |     Employees       |
|----------------|---------------------|
|          1     | ["John"]            |
|----------------|---------------------|
|          1     | ["Jane"]            |
|----------------|---------------------|
|          4     | ["Amy", "James"]    |
|----------------|---------------------|
|          2     | ["David"]           |
|----------------|---------------------|

我目前面临一些挑战:

  1. 有条件地分解数组
  2. 将数组分解成数组(在本例中不是字符串)

在 Hive 中,有一个 UDTF(用户定义的 table 函数)的概念允许我执行此操作。想知道有没有什么可以与之媲美的?

使用flatMap展开并指定你想要的任何条件。

case class Department (Dept_ID: String, Employees: Array[String], Contingent: Int)
case class DepartmentExp (Dept_ID: String, Employees: Array[String])

val ds = df.as[Department]

ds.flatMap(dept => {
  if (dept.Contingent == 1) {
    dept.Employees.map(emp => DepartmentExp(dept.Dept_ID, Array(emp)))
  } else {
    Array(DepartmentExp(dept.Dept_ID, dept.Employees))
  }
}).as[DepartmentExp]