如何从二进制 protobuf 中提取 JSON?
How to extract JSON from a binary protobuf?
将 Apache Spark 2.2.0 结构化流视为:
jsonStream.printSchema()
root
|-- body: binary (nullable = true)
body 中的数据是 Protocol Buffers v2 类型和嵌套 JSON。看起来像
syntax = "proto2";
message Data {
required string data = 1;
}
message List {
repeated Data entry = 1;
}
如何提取 Spark 中的数据以 "further" 处理它?
我查看了 ScalaPB,但是当我 运行 时,我在 Jupyter 中的代码无法将“.proto”代码内联。我也不知道如何将 DataFrame 转换为流上的 RDD。由于流媒体源,尝试 .rdd
失败。
更新 1:我想出了如何使用 ScalaPB 的控制台工具从 protobuf 规范生成 Scala 文件。从 "type mismatch".
开始,我仍然无法导入它们
tl;dr 编写一个 user-defined 函数(UDF)将二进制字段(带有 JSON 的 protobuf)反序列化为 JSON.
将序列化的 body
(采用 binary
格式)视为 table 列。暂时忘掉结构化流(和流数据集)。
然后让我将问题改写如下:
How to convert (aka cast) a value in binary to [here-your-format]?
一些格式直接 cast
-able 这使得将二进制文件转换为字符串很容易,如下所示:
$"body" cast "string"
如果字符串是 JSON 或 unixtime,您可以使用 built-in“转换器”,即 functions,如 from_json
或 from_unixtime
。
介绍应该会提示您如何像您一样进行转换。
The data inside body is of type Protocol Buffers v2 and a nested JSON.
要处理此类字段 (protobuf + json),您必须编写一个 Scala 函数来将“有效负载”解码为 JSON 并创建一个 user-defined 函数( UDF) 使用 udf:
udf(f: UDF1[_, _], returnType: DataType): UserDefinedFunction Defines a Java UDF1 instance as user-defined function (UDF). The caller must specify the output data type, and there is no automatic input type coercion. By default the returned UDF is deterministic. To change it to nondeterministic, call the API UserDefinedFunction.asNondeterministic()
.
然后使用 from_json
或 get_json_object
等函数。
为了简化您的案例,编写一个 single-argument 函数来执行转换并使用 udf
函数将其包装到 UDF 中。
Trying .rdd failed because of a streaming source.
使用Dataset.foreach or foreachPartition.
foreach(f: (T) ⇒ Unit): Unit Applies a function f to all rows.
foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit Applies a function f to each partition of this Dataset.
将 Apache Spark 2.2.0 结构化流视为:
jsonStream.printSchema()
root
|-- body: binary (nullable = true)
body 中的数据是 Protocol Buffers v2 类型和嵌套 JSON。看起来像
syntax = "proto2";
message Data {
required string data = 1;
}
message List {
repeated Data entry = 1;
}
如何提取 Spark 中的数据以 "further" 处理它?
我查看了 ScalaPB,但是当我 运行 时,我在 Jupyter 中的代码无法将“.proto”代码内联。我也不知道如何将 DataFrame 转换为流上的 RDD。由于流媒体源,尝试 .rdd
失败。
更新 1:我想出了如何使用 ScalaPB 的控制台工具从 protobuf 规范生成 Scala 文件。从 "type mismatch".
开始,我仍然无法导入它们tl;dr 编写一个 user-defined 函数(UDF)将二进制字段(带有 JSON 的 protobuf)反序列化为 JSON.
将序列化的 body
(采用 binary
格式)视为 table 列。暂时忘掉结构化流(和流数据集)。
然后让我将问题改写如下:
How to convert (aka cast) a value in binary to [here-your-format]?
一些格式直接 cast
-able 这使得将二进制文件转换为字符串很容易,如下所示:
$"body" cast "string"
如果字符串是 JSON 或 unixtime,您可以使用 built-in“转换器”,即 functions,如 from_json
或 from_unixtime
。
介绍应该会提示您如何像您一样进行转换。
The data inside body is of type Protocol Buffers v2 and a nested JSON.
要处理此类字段 (protobuf + json),您必须编写一个 Scala 函数来将“有效负载”解码为 JSON 并创建一个 user-defined 函数( UDF) 使用 udf:
udf(f: UDF1[_, _], returnType: DataType): UserDefinedFunction Defines a Java UDF1 instance as user-defined function (UDF). The caller must specify the output data type, and there is no automatic input type coercion. By default the returned UDF is deterministic. To change it to nondeterministic, call the API
UserDefinedFunction.asNondeterministic()
.
然后使用 from_json
或 get_json_object
等函数。
为了简化您的案例,编写一个 single-argument 函数来执行转换并使用 udf
函数将其包装到 UDF 中。
Trying .rdd failed because of a streaming source.
使用Dataset.foreach or foreachPartition.
foreach(f: (T) ⇒ Unit): Unit Applies a function f to all rows.
foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit Applies a function f to each partition of this Dataset.