如何在流式查询中使用 from_json 标准函数(在 select 中)?
How to use from_json standard function (in select) in streaming query?
我使用以下 JSON 结构处理来自 Kafka 的消息:
{"unix_time": 1557678233, "category_id": 1000, "ip": "172.10.34.17", "type": "view"}
我想打印我收到的东西。这是我已经完成的代码片段:
JavaSparkContext sc = createJavaSparkContext();
JavaStreamingContext streamingContext =
new JavaStreamingContext(sc, Durations.seconds(BATCH_DURATION_IN_SECONDS));
SparkSession sparkSession = SparkSession
.builder()
.config(new SparkConf())
.getOrCreate();
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", CommonUtils.KAFKA_HOST_PORT)
.option("subscribe", KAFKA_TOPIC)
.load();
StreamingQuery query = df.selectExpr("CAST(value AS STRING)")
.select(from_json(new Column("value"), getSchema())).as("data").
select("data.category_id").writeStream().foreach(new ForeachWriter<Row>() {
@Override
public void process(Row value) {
System.out.println(value);
}
@Override
public void close(Throwable errorOrNull) {
}
@Override
public boolean open(long partitionId, long version) {
return true;
}
})
.start();
query.awaitTermination();
架构方法:
private static StructType getSchema() {
return new StructType(new StructField[]{
new StructField(UNIX_TIME, DataTypes.TimestampType, false, Metadata.empty()),
new StructField(CATEGORY_ID, DataTypes.IntegerType, false, Metadata.empty()),
new StructField(IP, DataTypes.StringType, false, Metadata.empty()),
new StructField(TYPE, DataTypes.StringType, false, Metadata.empty()),
});
}
问题是我在从 Spark 写入时不断收到错误消息:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
cannot resolve 'data.category_id
' given input columns:
[jsontostruct(value)];; 'Project ['data.category_id]
+- SubqueryAlias data +- Project [jsontostruct(StructField(unix_time,TimestampType,false),
StructField(category_id,IntegerType,false),
StructField(ip,StringType,false), StructField(type,StringType,false),
value#15) AS jsontostruct(value)#18]
如何克服这个问题?有什么建议吗?
异常的这一部分告诉你在哪里寻找答案:
cannot resolve 'data.category_id' given input columns: [jsontostruct(value)]
换句话说,可用列中没有列data.category_id
,只有一个jsontostruct(value)
。
也就是说流式查询中唯一的select
不起作用。原因很简单(我可以把它定性为打字错误)——as("data")
之前的右括号太多,可用于 Column as well as Dataset 类型。
总而言之,替换查询的以下部分:
.select(from_json(new Column("value"), getSchema())).as("data")
以下内容:
.select(from_json(new Column("value"), getSchema()).as("data"))
请注意,我将一个右括号移到了最后。
我使用以下 JSON 结构处理来自 Kafka 的消息:
{"unix_time": 1557678233, "category_id": 1000, "ip": "172.10.34.17", "type": "view"}
我想打印我收到的东西。这是我已经完成的代码片段:
JavaSparkContext sc = createJavaSparkContext();
JavaStreamingContext streamingContext =
new JavaStreamingContext(sc, Durations.seconds(BATCH_DURATION_IN_SECONDS));
SparkSession sparkSession = SparkSession
.builder()
.config(new SparkConf())
.getOrCreate();
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", CommonUtils.KAFKA_HOST_PORT)
.option("subscribe", KAFKA_TOPIC)
.load();
StreamingQuery query = df.selectExpr("CAST(value AS STRING)")
.select(from_json(new Column("value"), getSchema())).as("data").
select("data.category_id").writeStream().foreach(new ForeachWriter<Row>() {
@Override
public void process(Row value) {
System.out.println(value);
}
@Override
public void close(Throwable errorOrNull) {
}
@Override
public boolean open(long partitionId, long version) {
return true;
}
})
.start();
query.awaitTermination();
架构方法:
private static StructType getSchema() {
return new StructType(new StructField[]{
new StructField(UNIX_TIME, DataTypes.TimestampType, false, Metadata.empty()),
new StructField(CATEGORY_ID, DataTypes.IntegerType, false, Metadata.empty()),
new StructField(IP, DataTypes.StringType, false, Metadata.empty()),
new StructField(TYPE, DataTypes.StringType, false, Metadata.empty()),
});
}
问题是我在从 Spark 写入时不断收到错误消息:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '
data.category_id
' given input columns: [jsontostruct(value)];; 'Project ['data.category_id] +- SubqueryAlias data +- Project [jsontostruct(StructField(unix_time,TimestampType,false), StructField(category_id,IntegerType,false), StructField(ip,StringType,false), StructField(type,StringType,false), value#15) AS jsontostruct(value)#18]
如何克服这个问题?有什么建议吗?
异常的这一部分告诉你在哪里寻找答案:
cannot resolve 'data.category_id' given input columns: [jsontostruct(value)]
换句话说,可用列中没有列data.category_id
,只有一个jsontostruct(value)
。
也就是说流式查询中唯一的select
不起作用。原因很简单(我可以把它定性为打字错误)——as("data")
之前的右括号太多,可用于 Column as well as Dataset 类型。
总而言之,替换查询的以下部分:
.select(from_json(new Column("value"), getSchema())).as("data")
以下内容:
.select(from_json(new Column("value"), getSchema()).as("data"))
请注意,我将一个右括号移到了最后。