从 Kafka 流解析 Spark 中的 JSON 消息
Parsing JSON message in Spark from Kafka stream
我有一个事件流(格式如下)需要在 spark (java) 中解析。我能够读取流,但无法找到将消息转换为 java bean 的示例。
{
user_id : string,
session_id : string,
event : string,
page : string,
timestamp : timestamp
}
Java豆子
public class Event implements Serializable {
private String user_id;
private String session_id;
private String page;
private String event;
private Timestamp timestamp;
}
将消息读取为字符串的代码。
Dataset<String> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
我能够使用以下方法让它工作。
FlatMapFunction<String, Event> linesToEvents = new FlatMapFunction<String, Event>() {
@Override
public Iterator<Event> call(String line) throws JsonMappingException, JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
ArrayList<Event> eventList = new ArrayList<>();
eventList.add(mapper.readValue(line, Event.class));
return eventList.iterator();
}
};
Dataset<Event> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING())
.flatMap(linesToEvents, Encoders.bean(Event.class));
我有一个事件流(格式如下)需要在 spark (java) 中解析。我能够读取流,但无法找到将消息转换为 java bean 的示例。
{
user_id : string,
session_id : string,
event : string,
page : string,
timestamp : timestamp
}
Java豆子
public class Event implements Serializable {
private String user_id;
private String session_id;
private String page;
private String event;
private Timestamp timestamp;
}
将消息读取为字符串的代码。
Dataset<String> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
我能够使用以下方法让它工作。
FlatMapFunction<String, Event> linesToEvents = new FlatMapFunction<String, Event>() {
@Override
public Iterator<Event> call(String line) throws JsonMappingException, JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
ArrayList<Event> eventList = new ArrayList<>();
eventList.add(mapper.readValue(line, Event.class));
return eventList.iterator();
}
};
Dataset<Event> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING())
.flatMap(linesToEvents, Encoders.bean(Event.class));