从字符串文字推断 Spark DataType
Inferring Spark DataType from string literals
我正在尝试编写一个可以根据提供的输入字符串推断 Spark DataTypes 的 Scala 函数:
/**
* Example:
* ========
* toSparkType("string") => StringType
* toSparkType("boolean") => BooleanType
* toSparkType("date") => DateType
* etc.
*/
def toSparkType(inputType : String) : DataType = {
var dt : DataType = null
if(matchesStringRegex(inputType)) {
dt = StringType
} else if(matchesBooleanRegex(inputType)) {
dt = BooleanType
} else if(matchesDateRegex(inputType)) {
dt = DateType
} else if(...) {
...
}
dt
}
我的目标是支持可用 DataTypes
的大部分(如果不是全部的话)。当我开始实现这个功能时,我开始思考:“Spark/Scala 可能已经有一个 helper/util 方法可以为我做这件事。”毕竟,我知道我可以做类似的事情:
var structType = new StructType()
structType.add("some_new_string_col", "string", true, Metadata.empty)
structType.add("some_new_boolean_col", "boolean", true, Metadata.empty)
structType.add("some_new_date_col", "date", true, Metadata.empty)
并且 Scala and/or Spark 会隐式地将我的 "string"
参数转换为 StringType
,等等。所以我问:我可以用这两个 Spark 做什么魔法或者 Scala 来帮助我实现我的转换器方法?
从 scala 开始,你似乎无法神奇地做你想做的事,例如检查这个例子:
import com.scalakata._
@instrument class Playground {
val x = 5
def f[T](v: T) = v
f(x)
val y = "boolean"
f(y)
def manOf[T: Manifest](t: T): Manifest[T] = manifest[T]
println(manOf(y))
}
这是我看完I want to get the type of a variable at runtime后写的。
现在从spark, since I do not have an installation in place right now, I couldn't compose an example, but there is nothing obvious to use, so I would suggest you to continue writing toSparkType()
as you have started, but take a look at the Source code for pyspark.sql.types开始。
你看问题是你总是传递一个字符串。
Spark/Scala probably already have a helper/util method that will do this for me.
你是对的。 Spark 已经有自己的模式和数据类型推断代码,用于从底层数据源(csv、json 等)推断模式。因此您可以查看它来实现自己的(实际实现标记为私有到 Spark 并绑定到 RDD 和内部 类,因此它不能直接从 Spark 外部的代码使用,但应该让你知道如何去做。)
鉴于 csv 是平面类型(并且 json 可以具有嵌套结构),csv 模式推断相对更直接,应该可以帮助您完成上述任务。因此,我将解释 csv 推理的工作原理(json 推理只需要考虑可能的嵌套结构,但数据类型推理非常相似)。
开场白,你想看的是CSVInferSchema object. Particularly, look at the infer
method which takes an RDD[Array[String]]
and infer the data type for each element of the array across the whole of RDD. The way it does is -- it marks each field as NullType
to begin with and then as it iterates over next row of values (Array[String]
) in the RDD
it updates the already inferred DataType
to a new DataType
if the new DataType
is more specific. This is happening here:
val rootTypes: Array[DataType] =
tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes)
现在 inferRowType
calls inferField
for each of the field in the row. inferField
implementation 可能是您要查找的内容 - 它采用到目前为止为特定字段推断的类型和当前行的字段的字符串值作为参数。然后 returns 现有的推断类型,或者如果推断出的新类型比新类型更具体。
相关部分代码如下:
typeSoFar match {
case NullType => tryParseInteger(field, options)
case IntegerType => tryParseInteger(field, options)
case LongType => tryParseLong(field, options)
case _: DecimalType => tryParseDecimal(field, options)
case DoubleType => tryParseDouble(field, options)
case TimestampType => tryParseTimestamp(field, options)
case BooleanType => tryParseBoolean(field, options)
case StringType => StringType
case other: DataType =>
throw new UnsupportedOperationException(s"Unexpected data type $other")
}
请注意,如果 typeSoFar
是 NullType,那么它首先尝试将其解析为 Integer
,但 tryParseInteger
调用是对较低类型解析的调用链。因此,如果它无法将值解析为 Integer,那么它将调用 tryParseLong
,失败时将调用 tryParseDecimal
,失败时将调用 tryParseDouble
w.o.f.w.i。 tryParseTimestamp
w.o.f.w.i tryParseBoolean
w.o.f.w.i。终于 stringType
.
因此,无论您的用例是什么,您都可以使用几乎相似的逻辑来实现。 (如果您不需要跨行合并,那么您只需逐字实现所有 tryParse*
方法并简单地调用 tryParseInteger
。无需编写您自己的正则表达式。)
希望这对您有所帮助。
是的,Spark 当然有您需要的魔力。
在 Spark 2.x 中,它是 CatalystSqlParser
对象,定义为 here。
例如:
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
CatalystSqlParser.parseDataType("string") // StringType
CatalystSqlParser.parseDataType("int") // IntegerType
等等。
但据我了解,它不是 public API 的一部分,因此可能会在下一版本中更改而不会发出任何警告。
所以您可以将您的方法实现为:
def toSparkType(inputType: String): DataType = CatalystSqlParser.parseDataType(inputType)
如果您将字符串文字写为数据类型名称,即。 "StringType" , "IntegerType" -
使用此功能 -
def StrtoDatatype(str: String): org.apache.spark.sql.types.DataType = {
val m = ru.runtimeMirror(getClass.getClassLoader)
val module = m.staticModule(s"org.apache.spark.sql.types.$str")
m.reflectModule(module).instance.asInstanceOf[org.apache.spark.sql.types.DataType]
}
如果您有字符串字面值 - string、int 等。
def sqlStrtoDatatype(str: String): org.apache.spark.sql.types.DataType = {
CatalystSqlParser.parseDataType(str)
}
我正在尝试编写一个可以根据提供的输入字符串推断 Spark DataTypes 的 Scala 函数:
/**
* Example:
* ========
* toSparkType("string") => StringType
* toSparkType("boolean") => BooleanType
* toSparkType("date") => DateType
* etc.
*/
def toSparkType(inputType : String) : DataType = {
var dt : DataType = null
if(matchesStringRegex(inputType)) {
dt = StringType
} else if(matchesBooleanRegex(inputType)) {
dt = BooleanType
} else if(matchesDateRegex(inputType)) {
dt = DateType
} else if(...) {
...
}
dt
}
我的目标是支持可用 DataTypes
的大部分(如果不是全部的话)。当我开始实现这个功能时,我开始思考:“Spark/Scala 可能已经有一个 helper/util 方法可以为我做这件事。”毕竟,我知道我可以做类似的事情:
var structType = new StructType()
structType.add("some_new_string_col", "string", true, Metadata.empty)
structType.add("some_new_boolean_col", "boolean", true, Metadata.empty)
structType.add("some_new_date_col", "date", true, Metadata.empty)
并且 Scala and/or Spark 会隐式地将我的 "string"
参数转换为 StringType
,等等。所以我问:我可以用这两个 Spark 做什么魔法或者 Scala 来帮助我实现我的转换器方法?
从 scala 开始,你似乎无法神奇地做你想做的事,例如检查这个例子:
import com.scalakata._
@instrument class Playground {
val x = 5
def f[T](v: T) = v
f(x)
val y = "boolean"
f(y)
def manOf[T: Manifest](t: T): Manifest[T] = manifest[T]
println(manOf(y))
}
这是我看完I want to get the type of a variable at runtime后写的。
现在从spark, since I do not have an installation in place right now, I couldn't compose an example, but there is nothing obvious to use, so I would suggest you to continue writing toSparkType()
as you have started, but take a look at the Source code for pyspark.sql.types开始。
你看问题是你总是传递一个字符串。
Spark/Scala probably already have a helper/util method that will do this for me.
你是对的。 Spark 已经有自己的模式和数据类型推断代码,用于从底层数据源(csv、json 等)推断模式。因此您可以查看它来实现自己的(实际实现标记为私有到 Spark 并绑定到 RDD 和内部 类,因此它不能直接从 Spark 外部的代码使用,但应该让你知道如何去做。)
鉴于 csv 是平面类型(并且 json 可以具有嵌套结构),csv 模式推断相对更直接,应该可以帮助您完成上述任务。因此,我将解释 csv 推理的工作原理(json 推理只需要考虑可能的嵌套结构,但数据类型推理非常相似)。
开场白,你想看的是CSVInferSchema object. Particularly, look at the infer
method which takes an RDD[Array[String]]
and infer the data type for each element of the array across the whole of RDD. The way it does is -- it marks each field as NullType
to begin with and then as it iterates over next row of values (Array[String]
) in the RDD
it updates the already inferred DataType
to a new DataType
if the new DataType
is more specific. This is happening here:
val rootTypes: Array[DataType] =
tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes)
现在 inferRowType
calls inferField
for each of the field in the row. inferField
implementation 可能是您要查找的内容 - 它采用到目前为止为特定字段推断的类型和当前行的字段的字符串值作为参数。然后 returns 现有的推断类型,或者如果推断出的新类型比新类型更具体。
相关部分代码如下:
typeSoFar match {
case NullType => tryParseInteger(field, options)
case IntegerType => tryParseInteger(field, options)
case LongType => tryParseLong(field, options)
case _: DecimalType => tryParseDecimal(field, options)
case DoubleType => tryParseDouble(field, options)
case TimestampType => tryParseTimestamp(field, options)
case BooleanType => tryParseBoolean(field, options)
case StringType => StringType
case other: DataType =>
throw new UnsupportedOperationException(s"Unexpected data type $other")
}
请注意,如果 typeSoFar
是 NullType,那么它首先尝试将其解析为 Integer
,但 tryParseInteger
调用是对较低类型解析的调用链。因此,如果它无法将值解析为 Integer,那么它将调用 tryParseLong
,失败时将调用 tryParseDecimal
,失败时将调用 tryParseDouble
w.o.f.w.i。 tryParseTimestamp
w.o.f.w.i tryParseBoolean
w.o.f.w.i。终于 stringType
.
因此,无论您的用例是什么,您都可以使用几乎相似的逻辑来实现。 (如果您不需要跨行合并,那么您只需逐字实现所有 tryParse*
方法并简单地调用 tryParseInteger
。无需编写您自己的正则表达式。)
希望这对您有所帮助。
是的,Spark 当然有您需要的魔力。
在 Spark 2.x 中,它是 CatalystSqlParser
对象,定义为 here。
例如:
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
CatalystSqlParser.parseDataType("string") // StringType
CatalystSqlParser.parseDataType("int") // IntegerType
等等。
但据我了解,它不是 public API 的一部分,因此可能会在下一版本中更改而不会发出任何警告。
所以您可以将您的方法实现为:
def toSparkType(inputType: String): DataType = CatalystSqlParser.parseDataType(inputType)
如果您将字符串文字写为数据类型名称,即。 "StringType" , "IntegerType" - 使用此功能 -
def StrtoDatatype(str: String): org.apache.spark.sql.types.DataType = {
val m = ru.runtimeMirror(getClass.getClassLoader)
val module = m.staticModule(s"org.apache.spark.sql.types.$str")
m.reflectModule(module).instance.asInstanceOf[org.apache.spark.sql.types.DataType]
}
如果您有字符串字面值 - string、int 等。
def sqlStrtoDatatype(str: String): org.apache.spark.sql.types.DataType = {
CatalystSqlParser.parseDataType(str)
}