Spark Structured Stream中使用自定义数据转换函数
Using self-defined data transform function in Spark Structured Stream
我阅读了以下博客,发现 API 非常有用。
博客中有很多数据选择的例子。就像使用 input
{
"a": {
"b": 1
}
}
应用Scala: events.select("a.b")
,输出将是
{
"b": 1
}
但是博客中没有提到数据类型转换。说我有以下输入:
{
"timestampInSec": "1514917353",
"ip": "123.39.76.112",
"money": "USD256",
"countInString": "6"
}
预期输出为:
{
"timestamp": "2018-01-02 11:22:33",
"ip_long": 2066173040,
"currency": "USD",
"money_amount": 256,
"count": 6
}
有些转换没有包含在org.apache.spark.sql.functions._
中:
- 时间戳为秒,字符串类型
- 将 IP 转换为 long
- 将
USD256
拆分为两列并将其中一列转换为数字
- 将字符串转换为数字
另一件事是错误处理和默认值。如果输入无效,例如:
{
"timestampInSec": "N/A",
"money": "999",
"countInString": "Number-Six"
}
预计输出可以
{
"timestamp": "1970-01-01 00:00:00",
"ip_long": 0,
"currency": "NA",
"money_amount": 999,
"count": -1
}
- 输入
timestampInSec
是 不是数字 。预计使用 0 并创建一个时间戳字符串作为 return 值
ip
在输入中 缺失 。预计使用默认值0.
money
字段不完整。它有金额但缺少货币。预计使用 NA
作为默认货币并正确翻译 money_amount
countInString
不是数字。预计使用 -1
( 而不是 0)作为默认值。
这些需求不常见,需要自定义一些业务逻辑代码。
我确实检查了一些函数,比如 to_timestamp
。有一些代码生成的东西,添加新功能似乎不太容易。有没有写自定义转换函数的guide/document?有没有简单的方法可以满足要求?
对于所有人:
import org.apache.spark.sql.functions._
Timestamp is in second and is a string type
val timestamp = coalesce(
$"timestampInSec".cast("long").cast("timestamp"),
lit(0).cast("timestamp")
).alias("timestamp")
Split USD256 to two columns and convert one of the column to number
val currencyPattern = "^([A-Z]+)?([0-9]+)$"
val currency = (trim(regexp_extract($"money", currencyPattern, 1)) match {
case c => when(length(c) === 0, "NA").otherwise(c)
}).alias("currency")
val amount = regexp_extract($"money", currencyPattern, 2)
.cast("decimal(38, 0)").alias("money_amount")
Convert string to number
val count = coalesce($"countInString".cast("long"), lit(-1)).alias("count")
Convert IP to long
val ipPattern = "^([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})"
val ip_long = coalesce(Seq((1, 24), (2, 16), (3, 8), (4, 0)).map {
case (group, numBits) => shiftLeft(
regexp_extract($"ip", ipPattern, group).cast("long"),
numBits
)
}.reduce(_ + _), lit(0)).alias("ip_long")
结果
val df = Seq(
("1514917353", "123.39.76.112", "USD256", "6"),
("N/A", null, "999", null)
).toDF("timestampInSec", "ip", "money", "countInString")
df.select(timestamp, currency, amount, count, ip_long).show
// +-------------------+--------+------------+-----+----------+
// | timestamp|currency|money_amount|count| ip_long|
// +-------------------+--------+------------+-----+----------+
// |2018-01-02 18:22:33| USD| 256| 6|2066173040|
// |1970-01-01 00:00:00| NA| 999| -1| 0|
// +-------------------+--------+------------+-----+----------+
我阅读了以下博客,发现 API 非常有用。
博客中有很多数据选择的例子。就像使用 input
{
"a": {
"b": 1
}
}
应用Scala: events.select("a.b")
,输出将是
{
"b": 1
}
但是博客中没有提到数据类型转换。说我有以下输入:
{
"timestampInSec": "1514917353",
"ip": "123.39.76.112",
"money": "USD256",
"countInString": "6"
}
预期输出为:
{
"timestamp": "2018-01-02 11:22:33",
"ip_long": 2066173040,
"currency": "USD",
"money_amount": 256,
"count": 6
}
有些转换没有包含在org.apache.spark.sql.functions._
中:
- 时间戳为秒,字符串类型
- 将 IP 转换为 long
- 将
USD256
拆分为两列并将其中一列转换为数字 - 将字符串转换为数字
另一件事是错误处理和默认值。如果输入无效,例如:
{
"timestampInSec": "N/A",
"money": "999",
"countInString": "Number-Six"
}
预计输出可以
{
"timestamp": "1970-01-01 00:00:00",
"ip_long": 0,
"currency": "NA",
"money_amount": 999,
"count": -1
}
- 输入
timestampInSec
是 不是数字 。预计使用 0 并创建一个时间戳字符串作为 return 值 ip
在输入中 缺失 。预计使用默认值0.money
字段不完整。它有金额但缺少货币。预计使用NA
作为默认货币并正确翻译money_amount
countInString
不是数字。预计使用-1
( 而不是 0)作为默认值。
这些需求不常见,需要自定义一些业务逻辑代码。
我确实检查了一些函数,比如 to_timestamp
。有一些代码生成的东西,添加新功能似乎不太容易。有没有写自定义转换函数的guide/document?有没有简单的方法可以满足要求?
对于所有人:
import org.apache.spark.sql.functions._
Timestamp is in second and is a string type
val timestamp = coalesce( $"timestampInSec".cast("long").cast("timestamp"), lit(0).cast("timestamp") ).alias("timestamp")
Split USD256 to two columns and convert one of the column to number
val currencyPattern = "^([A-Z]+)?([0-9]+)$" val currency = (trim(regexp_extract($"money", currencyPattern, 1)) match { case c => when(length(c) === 0, "NA").otherwise(c) }).alias("currency") val amount = regexp_extract($"money", currencyPattern, 2) .cast("decimal(38, 0)").alias("money_amount")
Convert string to number
val count = coalesce($"countInString".cast("long"), lit(-1)).alias("count")
Convert IP to long
val ipPattern = "^([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})" val ip_long = coalesce(Seq((1, 24), (2, 16), (3, 8), (4, 0)).map { case (group, numBits) => shiftLeft( regexp_extract($"ip", ipPattern, group).cast("long"), numBits ) }.reduce(_ + _), lit(0)).alias("ip_long")
结果
val df = Seq(
("1514917353", "123.39.76.112", "USD256", "6"),
("N/A", null, "999", null)
).toDF("timestampInSec", "ip", "money", "countInString")
df.select(timestamp, currency, amount, count, ip_long).show
// +-------------------+--------+------------+-----+----------+
// | timestamp|currency|money_amount|count| ip_long|
// +-------------------+--------+------------+-----+----------+
// |2018-01-02 18:22:33| USD| 256| 6|2066173040|
// |1970-01-01 00:00:00| NA| 999| -1| 0|
// +-------------------+--------+------------+-----+----------+