对类型化数据集进行模式匹配
Doing pattern matching on typed dataset
我正在尝试根据 spark 数据集的类型应用不同类型的逻辑。
根据传递给 doWork
(Customer
或 Worker
)的案例 class 的类型,我必须应用不同类型的聚合。
我该怎么做?
import org.apache.spark.sql.{Dataset, SparkSession}
object SparkSql extends App {
import spark.implicits._
val spark = SparkSession
.builder()
.appName("Simple app")
.config("spark.master", "local")
.getOrCreate()
sealed trait Person {
def name: String
}
final case class Customer(override val name: String, email: String) extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
val workers: Dataset[Worker] = Seq(
Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
Worker("Sam", id = 1, skills = Array("self-motivation"))
).toDS
def doWork(persons: Dataset[Person]): Unit = {
persons match {
case ... // Dataset[Customer] ... do something
case ... // Dataset[Worker] ... do something else
}
}
}
使用 case classes 你可以进行模式匹配。 Case classes 是 Scala 允许在对象上进行模式匹配而无需
需要大量样板文件。通常,您需要做的就是添加
您希望模式匹配的每个 class 的单个 case 关键字。
举个例子:
abstract class Expr
case class Var(name: String) extends Expr
case class Number(num: Double) extends Expr
case class UnOp(operator: String, arg: Expr) extends Expr
case class BinOp(operator: String,left: Expr, right: Expr) extends Expr
def simplifyTop(expr: Expr): Expr = expr match {
case UnOp("",UnOp("",e)) => e // Double negation
case BinOp("+", e, Number(0)) => e // Adding zero
case BinOp("*", e, Number(1)) => e // Multiplying by one
case _ => expr
}
以你的例子我会试试这个
def doWork(persons: Person): Unit = {
persons match {
case Customer => ... do something
case Worker ... do something else
}
}
dataset.map(doWork)
修改您的方法以接受 [T <:parent]
并从 Dataset.javaRdd
中提取 bean class 名称,如下所示
import org.apache.spark.sql.Dataset
object InheritDataframe {
private def matcherDef[T <:parent](dfb: Dataset[T]): Unit = {
dfb.toJavaRDD.classTag.toString() match {
case "child1" => println("child1")
case "child2" => println("child2")
case _ => println("Unkown")
}
}
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
val dfB = List(child1(1)).toDS()
val dfC = List(child2(1)).toDS()
matcherDef(dfB)
matcherDef(dfC)
}
}
case class child1(i: Int) extends parent(i)
case class child2(i: Int) extends parent(i)
class parent(j: Int)
试试这个-
sealed trait Person {
def name: String
}
final case class Customer(override val name: String, email: String) extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
测试用例-
@Test
def test62262873(): Unit = {
val workers: Dataset[Worker] = Seq(
Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
Worker("Sam", id = 2, skills = Array("self-motivation"))
).toDS
import scala.reflect.runtime.universe._
def doWork[T : TypeTag](persons: Dataset[T]): Unit = {
typeOf[T] match {
case t if t =:= typeOf[Worker] => println("I'm worker")
persons.as[Worker].filter(_.id == 2).show(false)
case t if t =:= typeOf[Customer] => println("I'm Customer")
persons.as[Customer].filter(_.name.contains("B")).show(false)
}
}
doWork(workers)
/**
* I'm worker
* +----+---+-----------------+
* |name|id |skills |
* +----+---+-----------------+
* |Sam |2 |[self-motivation]|
* +----+---+-----------------+
*/
}
我找到了自己问题的解决方案,但我想感谢 Someshwar Kale 的回答,因为它满足了要求。在这个版本中,我使用隐式创建转换器,我可以根据需要扩展它。
import org.apache.spark.sql.{Dataset, SparkSession}
object TempProject extends App {
import spark.implicits._
val spark = SparkSession
.builder()
.appName("Simple app")
.config("spark.master", "local")
.getOrCreate()
sealed trait Person {
def name: String
}
final case class Customer(override val name: String, email: String) extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
trait CustomDataProcessor[T] {
def doSomethingCool(dataset: Dataset[T]): Dataset[T]
}
implicit object CustomerDataProcessor extends CustomDataProcessor[Customer] {
override def doSomethingCool(dataset: Dataset[Customer]): Dataset[Customer] =
dataset.filter(_.name.contains("B"))
}
implicit object WorkerDataProcessor extends CustomDataProcessor[Worker] {
override def doSomethingCool(dataset: Dataset[Worker]): Dataset[Worker] =
dataset.filter(_.id == 2)
}
def doWork[T](person: Dataset[T])(implicit processor: CustomDataProcessor[T]): Unit = {
processor.doSomethingCool(person)
}
val workers: Dataset[Worker] = Seq(
Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
Worker("Sam", id = 1, skills = Array("self-motivation"))
).toDS
val customers: Dataset[Customer] = Seq(
Customer("Bob", "bob@email"),
Customer("Jack", "jack@email")
).toDS
doWork(workers)
doWork(customers)
}
我正在尝试根据 spark 数据集的类型应用不同类型的逻辑。
根据传递给 doWork
(Customer
或 Worker
)的案例 class 的类型,我必须应用不同类型的聚合。
我该怎么做?
import org.apache.spark.sql.{Dataset, SparkSession}
object SparkSql extends App {
import spark.implicits._
val spark = SparkSession
.builder()
.appName("Simple app")
.config("spark.master", "local")
.getOrCreate()
sealed trait Person {
def name: String
}
final case class Customer(override val name: String, email: String) extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
val workers: Dataset[Worker] = Seq(
Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
Worker("Sam", id = 1, skills = Array("self-motivation"))
).toDS
def doWork(persons: Dataset[Person]): Unit = {
persons match {
case ... // Dataset[Customer] ... do something
case ... // Dataset[Worker] ... do something else
}
}
}
使用 case classes 你可以进行模式匹配。 Case classes 是 Scala 允许在对象上进行模式匹配而无需 需要大量样板文件。通常,您需要做的就是添加 您希望模式匹配的每个 class 的单个 case 关键字。
举个例子:
abstract class Expr
case class Var(name: String) extends Expr
case class Number(num: Double) extends Expr
case class UnOp(operator: String, arg: Expr) extends Expr
case class BinOp(operator: String,left: Expr, right: Expr) extends Expr
def simplifyTop(expr: Expr): Expr = expr match {
case UnOp("",UnOp("",e)) => e // Double negation
case BinOp("+", e, Number(0)) => e // Adding zero
case BinOp("*", e, Number(1)) => e // Multiplying by one
case _ => expr
}
以你的例子我会试试这个
def doWork(persons: Person): Unit = {
persons match {
case Customer => ... do something
case Worker ... do something else
}
}
dataset.map(doWork)
修改您的方法以接受 [T <:parent]
并从 Dataset.javaRdd
中提取 bean class 名称,如下所示
import org.apache.spark.sql.Dataset
object InheritDataframe {
private def matcherDef[T <:parent](dfb: Dataset[T]): Unit = {
dfb.toJavaRDD.classTag.toString() match {
case "child1" => println("child1")
case "child2" => println("child2")
case _ => println("Unkown")
}
}
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
val dfB = List(child1(1)).toDS()
val dfC = List(child2(1)).toDS()
matcherDef(dfB)
matcherDef(dfC)
}
}
case class child1(i: Int) extends parent(i)
case class child2(i: Int) extends parent(i)
class parent(j: Int)
试试这个-
sealed trait Person {
def name: String
}
final case class Customer(override val name: String, email: String) extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
测试用例-
@Test
def test62262873(): Unit = {
val workers: Dataset[Worker] = Seq(
Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
Worker("Sam", id = 2, skills = Array("self-motivation"))
).toDS
import scala.reflect.runtime.universe._
def doWork[T : TypeTag](persons: Dataset[T]): Unit = {
typeOf[T] match {
case t if t =:= typeOf[Worker] => println("I'm worker")
persons.as[Worker].filter(_.id == 2).show(false)
case t if t =:= typeOf[Customer] => println("I'm Customer")
persons.as[Customer].filter(_.name.contains("B")).show(false)
}
}
doWork(workers)
/**
* I'm worker
* +----+---+-----------------+
* |name|id |skills |
* +----+---+-----------------+
* |Sam |2 |[self-motivation]|
* +----+---+-----------------+
*/
}
我找到了自己问题的解决方案,但我想感谢 Someshwar Kale 的回答,因为它满足了要求。在这个版本中,我使用隐式创建转换器,我可以根据需要扩展它。
import org.apache.spark.sql.{Dataset, SparkSession}
object TempProject extends App {
import spark.implicits._
val spark = SparkSession
.builder()
.appName("Simple app")
.config("spark.master", "local")
.getOrCreate()
sealed trait Person {
def name: String
}
final case class Customer(override val name: String, email: String) extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
trait CustomDataProcessor[T] {
def doSomethingCool(dataset: Dataset[T]): Dataset[T]
}
implicit object CustomerDataProcessor extends CustomDataProcessor[Customer] {
override def doSomethingCool(dataset: Dataset[Customer]): Dataset[Customer] =
dataset.filter(_.name.contains("B"))
}
implicit object WorkerDataProcessor extends CustomDataProcessor[Worker] {
override def doSomethingCool(dataset: Dataset[Worker]): Dataset[Worker] =
dataset.filter(_.id == 2)
}
def doWork[T](person: Dataset[T])(implicit processor: CustomDataProcessor[T]): Unit = {
processor.doSomethingCool(person)
}
val workers: Dataset[Worker] = Seq(
Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
Worker("Sam", id = 1, skills = Array("self-motivation"))
).toDS
val customers: Dataset[Customer] = Seq(
Customer("Bob", "bob@email"),
Customer("Jack", "jack@email")
).toDS
doWork(workers)
doWork(customers)
}