从 Kafka 流解析 Spark 中的 JSON 消息

Parsing JSON message in Spark from Kafka stream

我有一个事件流(格式如下)需要在 spark (java) 中解析。我能够读取流,但无法找到将消息转换为 java bean 的示例。

{
    user_id  : string,
    session_id : string,
    event : string,
    page : string,
    timestamp : timestamp
}

Java豆子

public class Event implements Serializable {

    private String user_id;

    private String session_id;
    private String page;
    private String event;
    private Timestamp timestamp;
}

将消息读取为字符串的代码。

Dataset<String> lines = spark
                        .readStream()
                        .format("kafka")
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", topics)
                        .load()
                        .selectExpr("CAST(value AS STRING)")
                        .as(Encoders.STRING());     

我能够使用以下方法让它工作。

        FlatMapFunction<String, Event> linesToEvents = new FlatMapFunction<String, Event>() {
            @Override
            public Iterator<Event> call(String line) throws JsonMappingException, JsonProcessingException {
                ObjectMapper mapper = new ObjectMapper();
                ArrayList<Event> eventList = new ArrayList<>();
                eventList.add(mapper.readValue(line, Event.class));
                return eventList.iterator();
            }
        };

        Dataset<Event> lines = spark
                                .readStream()
                                .format("kafka")
                                .option("kafka.bootstrap.servers", "localhost:9092")
                                .option("subscribe", topics)
                                .load()
                                .selectExpr("CAST(value AS STRING)")
                                .as(Encoders.STRING())
                                .flatMap(linesToEvents, Encoders.bean(Event.class));