如何将 List<Objects> 的 byte[] 解码为 spark 中的 Dataset<Row>?
How to decode a byte[] of List<Objects> to Dataset<Row> in spark?
我在我的项目中使用 spark-sql-2.3.1v,kafka 和 java8。
我正在尝试将接收到的主题 byte[] 转换为 kafka 消费者端的数据集。
这是详细信息
我有
class Company{
String companyName;
Integer companyId;
}
我定义为
public static final StructType companySchema = new StructType(
.add("companyName", DataTypes.StringType)
.add("companyId", DataTypes.IntegerType);
但消息定义为
class Message{
private List<Company> companyList;
private String messageId;
}
我试着定义为
StructType messageSchema = new StructType()
.add("companyList", DataTypes.createArrayType(companySchema , false),false)
.add("messageId", DataTypes.StringType);
我使用序列化将消息作为 byte[] 发送到 kafka 主题。
我在 consumer 成功收到消息字节 [] 。
我正在尝试将其转换为数据集??怎么做?
Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");
messagesDs.printSchema();
root
|-- companyList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- companyName: string (nullable = true)
| | |-- companyId: integer (nullable = true)
|-- messageId: string (nullable = true)
Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));
comapanyListDs.printSchema();
root
|-- col: struct (nullable = true)
| |-- companyName: string (nullable = true)
| |-- companyId: integer (nullable = true)
Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));
获取错误:
线程 "main" 中的异常 org.apache.spark.sql.AnalysisException:无法解析给定输入列的“companyName
”:[col];
如何获取数据集记录,如何获取?
你的结构在爆炸时被命名为 "col"。
由于您的 Bean class 没有 "col" 属性,因此它因上述错误而失败。
Exception in thread "main" org.apache.spark.sql.AnalysisException:
cannot resolve 'companyName' given input columns: [col];
您可以执行以下操作 select 以获取相关列作为普通列:
像这样:
Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));
我没有测试过语法,但是一旦从结构中为每一行获取普通列,就必须立即进行下一步。
我在我的项目中使用 spark-sql-2.3.1v,kafka 和 java8。 我正在尝试将接收到的主题 byte[] 转换为 kafka 消费者端的数据集。
这是详细信息
我有
class Company{
String companyName;
Integer companyId;
}
我定义为
public static final StructType companySchema = new StructType(
.add("companyName", DataTypes.StringType)
.add("companyId", DataTypes.IntegerType);
但消息定义为
class Message{
private List<Company> companyList;
private String messageId;
}
我试着定义为
StructType messageSchema = new StructType()
.add("companyList", DataTypes.createArrayType(companySchema , false),false)
.add("messageId", DataTypes.StringType);
我使用序列化将消息作为 byte[] 发送到 kafka 主题。
我在 consumer 成功收到消息字节 [] 。 我正在尝试将其转换为数据集??怎么做?
Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");
messagesDs.printSchema();
root
|-- companyList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- companyName: string (nullable = true)
| | |-- companyId: integer (nullable = true)
|-- messageId: string (nullable = true)
Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));
comapanyListDs.printSchema();
root
|-- col: struct (nullable = true)
| |-- companyName: string (nullable = true)
| |-- companyId: integer (nullable = true)
Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));
获取错误:
线程 "main" 中的异常 org.apache.spark.sql.AnalysisException:无法解析给定输入列的“companyName
”:[col];
如何获取数据集记录,如何获取?
你的结构在爆炸时被命名为 "col"。
由于您的 Bean class 没有 "col" 属性,因此它因上述错误而失败。
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'companyName' given input columns: [col];
您可以执行以下操作 select 以获取相关列作为普通列: 像这样:
Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));
我没有测试过语法,但是一旦从结构中为每一行获取普通列,就必须立即进行下一步。